You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/03/12 16:44:13 UTC

[5/8] qpid-broker-j git commit: QPID-8123: [Broker-J] [BDB System Tests] Refactor MultiNodeTest and TwoNodeTest to remove knowledge of Qpid JMS AMQP 0-x client (code dependency and format of failover url).

QPID-8123: [Broker-J] [BDB System Tests] Refactor MultiNodeTest and TwoNodeTest to remove knowledge of Qpid JMS AMQP 0-x client (code dependency and format of failover url).


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c09a9a31
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c09a9a31
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c09a9a31

Branch: refs/heads/master
Commit: c09a9a316fdadcf3b8af21661fbc5ccb3e1baa0a
Parents: 7ce5480
Author: Keith Wall <kw...@apache.org>
Authored: Sat Mar 10 00:04:37 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Mar 12 16:24:17 2018 +0000

----------------------------------------------------------------------
 .../berkeleydb/replication/GroupCreator.java    |  62 ++-
 .../berkeleydb/replication/MultiNodeTest.java   | 426 +++++++++----------
 .../berkeleydb/replication/TwoNodeTest.java     |  78 +++-
 .../apache/qpid/systests/ConnectionBuilder.java |   8 +-
 .../systests/GenericConnectionListener.java     |  29 ++
 .../org/apache/qpid/systests/JmsProvider.java   |   5 +
 .../QpidJmsClient0xConnectionBuilder.java       | 141 +++---
 .../qpid/systests/QpidJmsClient0xProvider.java  |  75 +++-
 .../QpidJmsClientConnectionBuilder.java         |  64 ++-
 .../qpid/systests/QpidJmsClientProvider.java    |  64 +++
 .../qpid/test/utils/QpidBrokerTestCase.java     |   8 +-
 test-profiles/CPPExcludes                       |   2 -
 test-profiles/Java010Excludes                   |   2 -
 test-profiles/Java10Excludes                    |  10 -
 14 files changed, 628 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
index 357c28d..2cc5484 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
@@ -50,8 +50,8 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
@@ -60,20 +60,18 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationN
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
 import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.systests.ConnectionBuilder;
 import org.apache.qpid.test.utils.BrokerHolder;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 
 public class GroupCreator
 {
-    protected static final Logger LOGGER = LoggerFactory.getLogger(GroupCreator.class);
-
-    private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
-    private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupCreator.class);
 
     private static final int FAILOVER_CYCLECOUNT = 40;
     private static final int FAILOVER_RETRIES = 0;
-    private static final int FAILOVER_CONNECTDELAY = 250;
+    private static final int FAILOVER_CONNECTDELAY = 1000;
 
     private final QpidBrokerTestCase _testcase;
     private final String _virtualHostName;
@@ -258,8 +256,7 @@ public class GroupCreator
 
     public int getBrokerPortNumberFromConnection(Connection connection)
     {
-        final AMQConnection amqConnection = (AMQConnection)connection;
-        return amqConnection.getActiveBrokerDetails().getPort();
+        return _testcase.getJmsProvider().getConnectedURI(connection).getPort();
     }
 
     public int getPortNumberOfAnInactiveBroker(final Connection activeConnection)
@@ -282,27 +279,43 @@ public class GroupCreator
         return ports;
     }
 
-    public String getConnectionUrlForAllClusterNodes() throws Exception
+    public ConnectionBuilder getConnectionBuilderForAllClusterNodes() throws Exception
     {
-        return  getConnectionUrlForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT);
+        return getConnectionBuilderForAllClusterNodes(FAILOVER_CONNECTDELAY, FAILOVER_RETRIES, FAILOVER_CYCLECOUNT);
     }
 
-    public String getConnectionUrlForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception
+    public ConnectionBuilder getConnectionBuilderForAllClusterNodes(int connectDelay, int retries, final int cyclecount) throws Exception
     {
-        final StringBuilder brokerList = new StringBuilder();
+        final ConnectionBuilder connectionBuilder = _testcase.getConnectionBuilder();
+        connectionBuilder.setFailoverReconnectDelay(connectDelay);
+        connectionBuilder.setVirtualHost(_virtualHostName);
+        connectionBuilder.setFailover(true);
+
+        final int reconnectAttempts = (retries == 0 ? 1 : retries) * (cyclecount == 0 ? 1 : cyclecount);
+        connectionBuilder.setFailoverReconnectAttempts(reconnectAttempts);
 
-        for(Iterator<Integer> itr = _members.keySet().iterator(); itr.hasNext(); )
+        final Iterator<Integer> iterator = _members.keySet().iterator();
+        if (iterator.hasNext())
         {
-            int brokerPortNumber = itr.next();
+            final int firstBroker = iterator.next();
+            connectionBuilder.setPort(firstBroker);
+        }
 
-            brokerList.append(String.format(BROKER_PORTION_FORMAT, brokerPortNumber, connectDelay, retries));
-            if (itr.hasNext())
-            {
-                brokerList.append(";");
-            }
+        while (iterator.hasNext())
+        {
+            int brokerPortNumber = iterator.next();
+            connectionBuilder.addFailoverPort(brokerPortNumber);
+        }
+
+        if (_testcase.getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        {
+            connectionBuilder.setOptions(Collections.singletonMap("failover.warnAfterReconnectAttempts", "1"));
+            // TODO - workaround for the fact that the client does not respect reconnectDelay if the
+            // server closes the connection gracefully.
+            connectionBuilder.setOptions(Collections.singletonMap("failover.initialReconnectDelay", "15000"));
         }
 
-        return String.format(MANY_BROKER_URL_FORMAT, _virtualHostName, brokerList, cyclecount);
+        return connectionBuilder;
     }
 
     public String getGroupName()
@@ -453,8 +466,13 @@ public class GroupCreator
                 Thread.sleep(1000);
             }
         }
-        LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' attribute  '" + attributeName + "' is " + attributeValue);
-        Assert.assertTrue("Unexpected " + attributeName + " at " + localNodePort, desiredValues.contains(attributeValue));
+        LOGGER.debug("Node '{}' attribute  '{}' value '{}'", getNodeNameForBrokerPort(remoteNodePort), attributeName, attributeValue);
+        Assert.assertTrue(String.format("Node port %d:  Attribute '%s' has unexpected value '%s', desired values [%s]",
+                                        localNodePort,
+                                        attributeName,
+                                        attributeValue,
+                                        desiredValues),
+                          desiredValues.contains(attributeValue));
     }
 
     public RestTestHelper createRestTestHelper(int brokerPort)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index f86f467..d541e1a 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -20,6 +20,7 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import java.io.File;
+import java.net.URI;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,7 +39,6 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -51,33 +51,35 @@ import com.sleepycat.je.rep.ReplicationConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.util.FileUtils;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
+import org.apache.qpid.systests.ConnectionBuilder;
+import org.apache.qpid.systests.GenericConnectionListener;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestUtils;
-import org.apache.qpid.util.FileUtils;
 
 public class MultiNodeTest extends QpidBrokerTestCase
 {
-    protected static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeTest.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeTest.class);
 
     private static final String VIRTUAL_HOST = "test";
     private static final int NUMBER_OF_NODES = 3;
+    private static final int FAILOVER_COMPLETION_TIMEOUT = 60000;
 
     private GroupCreator _groupCreator;
 
     private FailoverAwaitingListener _failoverListener;
 
     /** Used when expectation is client will (re)-connect */
-    private String _positiveFailoverUrl;
+    private ConnectionBuilder _positiveFailoverBuilder;
 
     /** Used when expectation is client will not (re)-connect */
-    private String _negativeFailoverUrl;
+    private ConnectionBuilder _negativeFailoverBuilder;
 
     @Override
     protected void setUp() throws Exception
@@ -88,8 +90,8 @@ public class MultiNodeTest extends QpidBrokerTestCase
         _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
         _groupCreator.configureClusterNodes();
 
-        _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
-        _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2);
+        _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
+        _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 0, 2);
 
         _groupCreator.startCluster();
         _failoverListener = new FailoverAwaitingListener();
@@ -105,16 +107,15 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testLossOfMasterNodeCausesClientToFailover() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
-
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
         _groupCreator.stopNode(activeBrokerPort);
         LOGGER.info("Node is stopped");
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
         // any op to ensure connection remains
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -122,25 +123,36 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
-        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
-        LOGGER.info("Stopping inactive broker on port " + inactiveBrokerPort);
+        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+        LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort);
 
         _groupCreator.stopNode(inactiveBrokerPort);
 
         _failoverListener.assertNoFailoverCompletionWithin(2000);
 
-        assertProducingConsuming(connection);
+        // any op to ensure connection remains
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
     public void testLossOfQuorumCausesClientDisconnection() throws Exception
     {
-        final Connection connection = getConnection(_negativeFailoverUrl);
+        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        {
+            // TODO - there seems to be a client defect when a JMS operation is interrupted
+            // by a graceful connection close from the client side.
+            return;
+        }
+
+        final Connection connection = _negativeFailoverBuilder.build();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(getTestQueueName());
+        getJmsProvider().createQueue(session, getTestQueueName());
 
         Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
 
@@ -155,10 +167,9 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
         try
         {
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            Destination destination = session.createQueue(getTestQueueName());
-            session.createConsumer(destination).close();
-            fail("Exception not thrown - creating durable queue should fail without quorum");
+
+            sendMessage(session, destination, 1);
+            fail("Exception not thrown - sending message within a transaction should fail without quorum");
         }
         catch(JMSException jms)
         {
@@ -168,7 +179,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
         // New connections should now fail as vhost will be unavailable
         try
         {
-            Connection unexpectedConnection = getConnection(_negativeFailoverUrl);
+            Connection unexpectedConnection = _negativeFailoverBuilder.build();
             fail("Got unexpected connection to node in group without quorum " + unexpectedConnection);
         }
         catch (JMSException je)
@@ -184,9 +195,13 @@ public class MultiNodeTest extends QpidBrokerTestCase
      */
     public void testQuorumLostAndRestored_OriginalMasterRejoinsTheGroup() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        Destination dest = session.createQueue(getTestQueueName());
+        session.close();
 
         Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
 
@@ -196,7 +211,6 @@ public class MultiNodeTest extends QpidBrokerTestCase
         Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
         Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
 
-        Destination dest = session1.createQueue(getTestQueueName());
         session1.createConsumer(dest).close();
 
         MessageProducer producer1 = session1.createProducer(dest);
@@ -221,32 +235,34 @@ public class MultiNodeTest extends QpidBrokerTestCase
             _groupCreator.startNode(p);
         }
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
 
         _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA");
     }
 
     public void testPersistentMessagesAvailableAfterFailover() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        Destination queue = session.createQueue(getTestQueueName());
+        session.close();
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
 
         Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        Destination queue = producingSession.createQueue(getTestQueueName());
-        producingSession.createConsumer(queue).close();
         sendMessage(producingSession, queue, 10);
 
         _groupCreator.stopNode(activeBrokerPort);
-        LOGGER.info("Old master (broker port " + activeBrokerPort + ") is stopped");
+        LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort);
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Failover has finished");
 
         final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("New master (broker port " + activeBrokerPort + ") after failover");
+        LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover);
 
         Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer consumer = consumingSession.createConsumer(queue);
@@ -254,7 +270,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
         connection.start();
         for(int i = 0; i < 10; i++)
         {
-            Message m = consumer.receive(RECEIVE_TIMEOUT);
+            Message m = consumer.receive(getReceiveTimeout());
             assertNotNull("Message " + i + "  is not received", m);
             assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
         }
@@ -263,13 +279,17 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testTransferMasterFromLocalNode() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
         final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
 
         // transfer mastership 3 times in order to verify
         // that repeated mastership transfer to the same node works, See QPID-6996
@@ -283,14 +303,13 @@ public class MultiNodeTest extends QpidBrokerTestCase
                                              final int activeBrokerPort) throws Exception
     {
         _failoverListener = new FailoverAwaitingListener();
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
         Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
-        _groupCreator.setNodeAttributes(inactiveBrokerPort,
-                                          Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        _groupCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
         attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
@@ -303,15 +322,17 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testTransferMasterFromRemoteNode() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
 
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
         final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
 
         // transfer mastership 3 times in order to verify
         // that repeated mastership transfer to the same node works, See QPID-6996
@@ -325,15 +346,15 @@ public class MultiNodeTest extends QpidBrokerTestCase
                                               final int inactiveBrokerPort) throws Exception
     {
         _failoverListener = new FailoverAwaitingListener();
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
         _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
         Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
         attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
@@ -346,12 +367,11 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testTransferMasterWhilstMessagesInFlight() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
-        ((AMQConnection) connection).setConnectionListener(_failoverListener);
-
-        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
         final Destination destination = session.createQueue(getTestQueueName());
-        session.createConsumer(destination).close();
 
         final AtomicBoolean masterTransferred = new AtomicBoolean(false);
         final AtomicBoolean keepRunning = new AtomicBoolean(true);
@@ -360,58 +380,52 @@ public class MultiNodeTest extends QpidBrokerTestCase
         final CountDownLatch producedOneAfter = new CountDownLatch(1);
         final CountDownLatch workerShutdown = new CountDownLatch(1);
 
-        Runnable producer = new Runnable()
-        {
-            @Override
-            public void run()
+        Runnable producer = () -> {
+            try
             {
-                try
-                {
-                    int count = 0;
-                    MessageProducer producer = session.createProducer(destination);
+                int count = 0;
+                MessageProducer producer1 = session.createProducer(destination);
 
-                    while (keepRunning.get())
+                while (keepRunning.get())
+                {
+                    String messageText = "message" + count;
+                    try
                     {
-                        String messageText = "message" + count;
-                        try
-                        {
-                            Message message = session.createTextMessage(messageText);
-                            producer.send(message);
-                            session.commit();
-                            LOGGER.debug("Sent message " + count);
+                        Message message = session.createTextMessage(messageText);
+                        producer1.send(message);
+                        session.commit();
+                        LOGGER.debug("Sent message " + count);
 
-                            producedOneBefore.countDown();
+                        producedOneBefore.countDown();
 
-                            if (masterTransferred.get())
-                            {
-                                producedOneAfter.countDown();
-                            }
-                            count++;
-                        }
-                        catch (javax.jms.IllegalStateException ise)
-                        {
-                            throw ise;
-                        }
-                        catch (TransactionRolledBackException trbe)
-                        {
-                            // Pass - failover in prgoress
-                        }
-                        catch(JMSException je)
+                        if (masterTransferred.get())
                         {
-                            // Pass - failover in progress
+                            producedOneAfter.countDown();
                         }
+                        count++;
+                    }
+                    catch (javax.jms.IllegalStateException ise)
+                    {
+                        throw ise;
+                    }
+                    catch (TransactionRolledBackException trbe)
+                    {
+                        // Pass - failover in prgoress
+                    }
+                    catch(JMSException je)
+                    {
+                        // Pass - failover in progress
                     }
-                }
-                catch (Exception e)
-                {
-                    workerException.set(e);
-                }
-                finally
-                {
-                    workerShutdown.countDown();
                 }
             }
-
+            catch (Exception e)
+            {
+                workerException.set(e);
+            }
+            finally
+            {
+                workerShutdown.countDown();
+            }
         };
 
         Thread backgroundWorker = new Thread(producer);
@@ -421,18 +435,18 @@ public class MultiNodeTest extends QpidBrokerTestCase
         assertTrue(workerRunning);
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
         final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
 
         _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
         Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Failover has finished");
 
         attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
@@ -454,35 +468,39 @@ public class MultiNodeTest extends QpidBrokerTestCase
         assertNull(workerException.get());
 
         assertNotNull(session.createTemporaryQueue());
-
     }
 
     public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception
     {
+        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        {
+            // TODO - there seems to be a client defect when a JMS operation is interrupted
+            // by a graceful connection close from the client side.
+            return;
+        }
+
         int connectionNumber = Integer.getInteger("MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections", 20);
-        ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES -1);
+        ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES - 1);
         try
         {
-            String connectionUrl = _groupCreator.getConnectionUrlForAllClusterNodes(100, 0, 100);
+            final ConnectionBuilder builder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 0, 100);
+            final Connection consumerConnection = builder.build();
+            Session s = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+            getJmsProvider().createQueue(s, getTestQueueName());
+            s.close();
 
-            final Connection consumerConnection = getConnection(connectionUrl);
             consumerConnection.start();
 
             final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             final Destination destination = consumerSession.createQueue(getTestQueueName());
-            consumerSession.createConsumer(destination).setMessageListener(new MessageListener()
-            {
-                @Override
-                public void onMessage(final Message message)
+            consumerSession.createConsumer(destination).setMessageListener(message -> {
+                try
                 {
-                    try
-                    {
-                        LOGGER.info("Message received: " + ((TextMessage) message).getText());
-                    }
-                    catch (JMSException e)
-                    {
-                        LOGGER.error("Failure to get message text", e);
-                    }
+                    LOGGER.info("Message received: " + ((TextMessage) message).getText());
+                }
+                catch (JMSException e)
+                {
+                    LOGGER.error("Failure to get message text", e);
                 }
             });
 
@@ -490,7 +508,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
             final Session[] sessions = new Session[connectionNumber];
             for (int i = 0; i < sessions.length; i++)
             {
-                connections[i] = getConnection(connectionUrl);
+                connections[i] = builder.build();
                 sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
                 LOGGER.info("Session {} is created", i);
             }
@@ -512,8 +530,8 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
                 for (int i = 0; i < sessions.length; i++)
                 {
-                    AMQConnection connection = (AMQConnection)connections[i];
-                    connection.setConnectionListener(failoverListener);
+                    Connection connection = connections[i];
+                    getJmsProvider().addGenericConnectionListener(connection, failoverListener);
 
                     MessageProducer producer = sessions[i].createProducer(destination);
                     Message message = sessions[i].createTextMessage(messageText + "-" + i);
@@ -523,7 +541,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
                 LOGGER.info("All publishing sessions have uncommitted transactions");
 
                 final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]);
-                LOGGER.info("Active connection port " + activeBrokerPort);
+                LOGGER.info("Active connection port {}", activeBrokerPort);
 
                 List<Integer> inactivePorts = new ArrayList<>(ports);
                 inactivePorts.remove(new Integer(activeBrokerPort));
@@ -534,27 +552,22 @@ public class MultiNodeTest extends QpidBrokerTestCase
                     final int inactiveBrokerPort = port;
                     LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
 
-                    executorService.submit(new Runnable()
-                    {
-                        @Override
-                        public void run()
+                    executorService.submit(() -> {
+                        try
                         {
-                            try
-                            {
-                                _groupCreator.setNodeAttributes(inactiveBrokerPort,
-                                                                inactiveBrokerPort,
-                                                                Collections.<String, Object>singletonMap(
-                                                                        BDBHAVirtualHostNode.DESIRED_STATE,
-                                                                        State.STOPPED.name()));
-                            }
-                            catch (Exception e)
-                            {
-                                LOGGER.error("Failed to stop node on broker with port " + inactiveBrokerPort, e);
-                            }
-                            finally
-                            {
-                                latch.countDown();
-                            }
+                            _groupCreator.setNodeAttributes(inactiveBrokerPort,
+                                                            inactiveBrokerPort,
+                                                            Collections.singletonMap(
+                                                                    BDBHAVirtualHostNode.DESIRED_STATE,
+                                                                    State.STOPPED.name()));
+                        }
+                        catch (Exception e)
+                        {
+                            LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e);
+                        }
+                        finally
+                        {
+                            latch.countDown();
                         }
                     });
                 }
@@ -562,22 +575,16 @@ public class MultiNodeTest extends QpidBrokerTestCase
                 latch.await(500, TimeUnit.MILLISECONDS);
 
                 LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
-                for (int i = 0; i < sessions.length; i++)
+                for (final Session session : sessions)
                 {
-                    final Session session = sessions[i];
-                    executorService.submit(new Runnable()
-                    {
-                        @Override
-                        public void run()
+                    executorService.submit(() -> {
+                        try
+                        {
+                            session.commit();
+                        }
+                        catch (JMSException e)
                         {
-                            try
-                            {
-                                session.commit();
-                            }
-                            catch (JMSException e)
-                            {
-                                // majority of commits might fail due to insufficient replicas
-                            }
+                            // majority of commits might fail due to insufficient replicas
                         }
                     });
                 }
@@ -596,7 +603,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
                     {
                         _groupCreator.setNodeAttributes(port,
                                                         port,
-                                                        Collections.<String, Object>singletonMap(
+                                                        Collections.singletonMap(
                                                                 BDBHAVirtualHostNode.DESIRED_STATE,
                                                                 State.ACTIVE.name()));
                     }
@@ -635,8 +642,11 @@ public class MultiNodeTest extends QpidBrokerTestCase
      */
     public void testQuorumOverride() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
 
         Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
 
@@ -650,17 +660,17 @@ public class MultiNodeTest extends QpidBrokerTestCase
         }
 
         LOGGER.info("Awaiting failover to start");
-        _failoverListener.awaitPreFailover(20000);
+        _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Failover has begun");
 
         Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
         assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
-        _groupCreator.setNodeAttributes(activeBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
+        _groupCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
 
         attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
         assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Failover has finished");
 
         assertProducingConsuming(connection);
@@ -668,12 +678,14 @@ public class MultiNodeTest extends QpidBrokerTestCase
 
     public void testPriority() throws Exception
     {
-        final Connection connection = getConnection(_positiveFailoverUrl);
-
-        ((AMQConnection)connection).setConnectionListener(_failoverListener);
+        final Connection connection = _positiveFailoverBuilder.build();
+        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
 
         final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port " + activeBrokerPort);
+        LOGGER.info("Active connection port {}", activeBrokerPort);
 
         int priority = 1;
         Integer highestPriorityBrokerPort = null;
@@ -690,7 +702,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
             }
         }
 
-        LOGGER.info("Broker on port " + highestPriorityBrokerPort + " has the highest priority of " + priority);
+        LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority);
 
         // make sure all remote nodes are materialized on the master
         // in order to make sure that DBPing is not invoked
@@ -725,7 +737,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
         LOGGER.info("Shutting down the MASTER");
         _groupCreator.stopNode(activeBrokerPort);
 
-        _failoverListener.awaitFailoverCompletion(20000);
+        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
         Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort);
@@ -756,25 +768,14 @@ public class MultiNodeTest extends QpidBrokerTestCase
                                                    Durability.SyncPolicy.WRITE_NO_SYNC,
                                                    Durability.ReplicaAckPolicy.SIMPLE_MAJORITY));
 
-            ReplicatedEnvironment intruder = null;
             final String currentThreadName = Thread.currentThread().getName();
-            try
+            try(ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig))
             {
-                intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
+                LOGGER.debug("Intruder started");
             }
             finally
             {
-                try
-                {
-                    if (intruder != null)
-                    {
-                        intruder.close();
-                    }
-                }
-                finally
-                {
-                    Thread.currentThread().setName(currentThreadName);
-                }
+                Thread.currentThread().setName(currentThreadName);
             }
 
             for (int port : _groupCreator.getBrokerPortNumbersForNodes())
@@ -802,29 +803,7 @@ public class MultiNodeTest extends QpidBrokerTestCase
         }
     }
 
-    private void awaitNextTransaction(final int brokerPort) throws Exception
-    {
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(brokerPort);
-        final int originalTransactionId = (int) attributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
-        int currentTransactionId = 0;
-        long timeout = System.currentTimeMillis() + 60000;
-        LOGGER.debug("Awaiting next transaction. Original transaction id {}", originalTransactionId);
-        do
-        {
-            Thread.sleep(250);
-            attributes = _groupCreator.getNodeAttributes(brokerPort);
-            currentTransactionId = (int) attributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
-            LOGGER.debug("Current transaction id {}", currentTransactionId);
-        }
-        while (originalTransactionId >= currentTransactionId && timeout > System.currentTimeMillis());
-
-        assertTrue("Group transaction has not occurred within timeout."
-                   + "Current transaction id " + currentTransactionId
-                   + "Original transaction id " + originalTransactionId,
-                   currentTransactionId > originalTransactionId);
-    }
-
-    private final class FailoverAwaitingListener implements ConnectionListener
+    private final class FailoverAwaitingListener implements GenericConnectionListener
     {
         private final CountDownLatch _failoverCompletionLatch;
         private final CountDownLatch _preFailoverLatch;
@@ -842,20 +821,19 @@ public class MultiNodeTest extends QpidBrokerTestCase
         }
 
         @Override
-        public boolean preResubscribe()
+        public void onConnectionInterrupted(URI uri)
         {
-            return true;
+            _failoverStarted = true;
+            _preFailoverLatch.countDown();
         }
 
         @Override
-        public synchronized boolean preFailover(boolean redirect)
+        public void onConnectionRestored(URI uri)
         {
-            _failoverStarted = true;
-            _preFailoverLatch.countDown();
-            return true;
+            _failoverCompletionLatch.countDown();
         }
 
-        public void awaitFailoverCompletion(long delay) throws InterruptedException
+        void awaitFailoverCompletion(long delay) throws InterruptedException
         {
             if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
             {
@@ -869,35 +847,19 @@ public class MultiNodeTest extends QpidBrokerTestCase
             assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
         }
 
-        public void assertNoFailoverCompletionWithin(long delay) throws InterruptedException
+        void assertNoFailoverCompletionWithin(long delay) throws InterruptedException
         {
             _failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS);
             assertEquals("Failover occurred unexpectedly", 1L, _failoverCompletionLatch.getCount());
         }
 
-        public void awaitPreFailover(long delay) throws InterruptedException
+        void awaitPreFailover(long delay) throws InterruptedException
         {
             boolean complete = _preFailoverLatch.await(delay, TimeUnit.MILLISECONDS);
             assertTrue("Failover was expected to begin within " + delay + " ms.", complete);
         }
 
-        @Override
-        public void failoverComplete()
-        {
-            _failoverCompletionLatch.countDown();
-        }
-
-        @Override
-        public void bytesSent(long count)
-        {
-        }
-
-        @Override
-        public void bytesReceived(long count)
-        {
-        }
-
-        public synchronized boolean isFailoverStarted()
+        synchronized boolean isFailoverStarted()
         {
             return _failoverStarted;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
index 31d88ca..385417f 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
@@ -19,14 +19,16 @@
  */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.jms.Session;
 
+import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.systests.ConnectionBuilder;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class TwoNodeTest extends QpidBrokerTestCase
@@ -38,10 +40,10 @@ public class TwoNodeTest extends QpidBrokerTestCase
     private GroupCreator _groupCreator;
 
     /** Used when expectation is client will not (re)-connect */
-    private String _positiveFailoverUrl;
+    private ConnectionBuilder _positiveFailoverBuilder;
 
     /** Used when expectation is client will not (re)-connect */
-    private String _negativeFailoverUrl;
+    private ConnectionBuilder _negativeFailoverBuilder;
 
     @Override
     protected void setUp() throws Exception
@@ -62,24 +64,28 @@ public class TwoNodeTest extends QpidBrokerTestCase
 
     private void startCluster(boolean designedPrimary) throws Exception
     {
-        setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
         _groupCreator.configureClusterNodes();
         _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
-        _positiveFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes();
-        _negativeFailoverUrl = _groupCreator.getConnectionUrlForAllClusterNodes(200, 0, 2);
+        _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
+        _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 0, 2);
         _groupCreator.startCluster();
     }
 
     public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
     {
         startCluster(true);
-        final Connection initialConnection = getConnection(_positiveFailoverUrl);
+
+        final Connection initialConnection = _positiveFailoverBuilder.build();
+        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
+
         int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection);
         assertProducingConsuming(initialConnection);
         initialConnection.close();
         _groupCreator.stopCluster();
         _groupCreator.startNode(masterPort);
-        final Connection secondConnection = getConnection(_positiveFailoverUrl);
+        final Connection secondConnection = _positiveFailoverBuilder.build();
         assertProducingConsuming(secondConnection);
         secondConnection.close();
     }
@@ -87,12 +93,17 @@ public class TwoNodeTest extends QpidBrokerTestCase
     public void testClusterRestartWithoutDesignatedPrimary() throws Exception
     {
         startCluster(false);
-        final Connection initialConnection = getConnection(_positiveFailoverUrl);
+
+        final Connection initialConnection = _positiveFailoverBuilder.build();
+        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
+
         assertProducingConsuming(initialConnection);
         initialConnection.close();
         _groupCreator.stopCluster();
         _groupCreator.startClusterParallel();
-        final Connection secondConnection = getConnection(_positiveFailoverUrl);
+        final Connection secondConnection = _positiveFailoverBuilder.build();
         assertProducingConsuming(secondConnection);
         secondConnection.close();
     }
@@ -101,19 +112,38 @@ public class TwoNodeTest extends QpidBrokerTestCase
     {
         startCluster(true);
         _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
-        final Connection connection = getConnection(_positiveFailoverUrl);
+
+        final Connection connection = _positiveFailoverBuilder.build();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        session.close();
+
         assertNotNull("Expected to get a valid connection to primary", connection);
         assertProducingConsuming(connection);
     }
 
     public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception
     {
+        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        {
+            // TODO - there seems to be a client defect when a JMS operation is interrupted
+            // by a graceful connection close from the client side.
+            return;
+        }
+
         startCluster(false);
+
+        final Connection initialConnection = _negativeFailoverBuilder.build();
+        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        initialConnection.close();
+
         _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
 
         try
         {
-            Connection connection = getConnection(_negativeFailoverUrl);
+
+            Connection connection = _negativeFailoverBuilder.build();
             assertProducingConsuming(connection);
             fail("Exception not thrown");
         }
@@ -127,12 +157,19 @@ public class TwoNodeTest extends QpidBrokerTestCase
 
     public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
     {
+        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        {
+            // TODO - there seems to be a client defect when a JMS operation is interrupted
+            // by a graceful connection close from the client side.
+            return;
+        }
+
         startCluster(true);
         _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
 
         try
         {
-            getConnection(_negativeFailoverUrl);
+            _negativeFailoverBuilder.build();
             fail("Connection not expected");
         }
         catch (JMSException e)
@@ -158,6 +195,12 @@ public class TwoNodeTest extends QpidBrokerTestCase
     {
         startCluster(true);
 
+        final Connection initialConnection = _positiveFailoverBuilder.build();
+        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        initialConnection.close();
+
+
         _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
 
         Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
@@ -174,7 +217,7 @@ public class TwoNodeTest extends QpidBrokerTestCase
         }
         assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
 
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
         assertNotNull("Expected to get a valid connection to new primary", connection);
         assertProducingConsuming(connection);
     }
@@ -183,6 +226,11 @@ public class TwoNodeTest extends QpidBrokerTestCase
     {
         startCluster(false);
 
+        final Connection initialConnection = _positiveFailoverBuilder.build();
+        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
+        getJmsProvider().createQueue(session, getTestQueueName());
+        initialConnection.close();
+
         _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
 
         Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
@@ -191,7 +239,7 @@ public class TwoNodeTest extends QpidBrokerTestCase
         _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
         _groupCreator.awaitNodeToAttainRole(_groupCreator.getBrokerPortNumberOfPrimary(), "MASTER" );
 
-        final Connection connection = getConnection(_positiveFailoverUrl);
+        final Connection connection = _positiveFailoverBuilder.build();
         assertNotNull("Expected to get a valid connection to primary", connection);
         assertProducingConsuming(connection);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/ConnectionBuilder.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/ConnectionBuilder.java
index 6e64fd2..34a7f62 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/ConnectionBuilder.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/ConnectionBuilder.java
@@ -34,14 +34,19 @@ public interface ConnectionBuilder
 
     ConnectionBuilder setHost(String host);
     ConnectionBuilder setPort(int port);
+
+    @Deprecated
     ConnectionBuilder setSslPort(int port);
+
     ConnectionBuilder setPrefetch(int prefetch);
     ConnectionBuilder setClientId(String clientId);
     ConnectionBuilder setUsername(String username);
     ConnectionBuilder setPassword(String password);
     ConnectionBuilder setVirtualHost(String virtualHostName);
     ConnectionBuilder setFailover(boolean enableFailover);
+    ConnectionBuilder addFailoverPort(int port);
     ConnectionBuilder setFailoverReconnectAttempts(int reconnectAttempts);
+    ConnectionBuilder setFailoverReconnectDelay(int connectDelay);
     ConnectionBuilder setTls(boolean enableTls);
     ConnectionBuilder setSyncPublish(boolean syncPublish);
     ConnectionBuilder setOptions(Map<String, String> options);
@@ -56,8 +61,9 @@ public interface ConnectionBuilder
     ConnectionBuilder setVerifyHostName(boolean verifyHostName);
     ConnectionBuilder setKeyAlias(String alias);
     ConnectionBuilder setSaslMechanisms(String... mechanism);
-    ConnectionBuilder setCompress(boolean compress);
 
+    ConnectionBuilder setCompress(boolean compress);
     Connection build() throws NamingException, JMSException;
+
     ConnectionFactory buildConnectionFactory() throws NamingException;
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/GenericConnectionListener.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/GenericConnectionListener.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/GenericConnectionListener.java
new file mode 100644
index 0000000..0d55d8a
--- /dev/null
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/GenericConnectionListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests;
+
+import java.net.URI;
+
+public interface GenericConnectionListener
+{
+    void onConnectionRestored(URI uri);
+    void onConnectionInterrupted(URI uri);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
index beece11..7980d17 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsProvider.java
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.systests;
 
+import java.net.URI;
 import java.net.URISyntaxException;
 
 import javax.jms.Connection;
@@ -49,4 +50,8 @@ public interface JmsProvider
     Topic createTopicOnFanout(Connection con, String topicName) throws JMSException, URISyntaxException;
 
     ConnectionBuilder getConnectionBuilder();
+
+    void addGenericConnectionListener(Connection connection, GenericConnectionListener genericConnectionListener);
+
+    URI getConnectedURI(Connection connection);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xConnectionBuilder.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xConnectionBuilder.java
index 95e6134..262d20b 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xConnectionBuilder.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xConnectionBuilder.java
@@ -23,9 +23,12 @@ package org.apache.qpid.systests;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -36,6 +39,8 @@ import javax.naming.NamingException;
 
 public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
 {
+    private final List<Integer> _failoverPorts = new ArrayList<>();
+
     private String _clientId = "clientid";
     private String _username = USERNAME;
     private String _password = PASSWORD;
@@ -44,6 +49,7 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
     private boolean _enableFailover;
     private final Map<String, Object> _options = new TreeMap<>();
     private int _reconnectAttempts = 20;
+    private int _connectdelay;
     private String _host = "localhost";
     private int _port;
     private int _sslPort;
@@ -70,6 +76,13 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
     }
 
     @Override
+    public ConnectionBuilder addFailoverPort(final int port)
+    {
+        _failoverPorts.add(port);
+        return this;
+    }
+
+    @Override
     public ConnectionBuilder setSslPort(final int port)
     {
         _sslPort = port;
@@ -126,6 +139,13 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
     }
 
     @Override
+    public ConnectionBuilder setFailoverReconnectDelay(final int connectDelay)
+    {
+        _connectdelay = connectDelay;
+        return this;
+    }
+
+    @Override
     public ConnectionBuilder setTls(final boolean enableTls)
     {
         _enableTls = enableTls;
@@ -285,73 +305,47 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
         {
             cUrlBuilder.append(_virtualHost);
         }
+        cUrlBuilder.append("?");
 
-        cUrlBuilder.append("?brokerlist='tcp://").append(_host).append(":");
-        if (_enableTls)
-        {
-            cUrlBuilder.append(_sslPort).append("?ssl='true'");
-            if (_keyStoreLocation != null)
-            {
-                cUrlBuilder.append("&key_store='").append(encodeBrokerOption(_keyStoreLocation)).append('\'');
-            }
-            if (_keyStorePassword != null)
-            {
-                cUrlBuilder.append("&key_store_password='").append(_keyStorePassword).append('\'');
-            }
-            if (_trustStoreLocation != null)
-            {
-                cUrlBuilder.append("&trust_store='").append(encodeBrokerOption(_trustStoreLocation)).append('\'');
-            }
-            if (_trustStorePassword != null)
-            {
-                cUrlBuilder.append("&trust_store_password='").append(_trustStorePassword).append('\'');
-            }
-            if (_verifyHostName != null)
-            {
-                cUrlBuilder.append("&ssl_verify_hostname='").append(_verifyHostName).append('\'');
-            }
-            if (_keyAlias != null)
-            {
-                cUrlBuilder.append("&ssl_cert_alias='").append(_keyAlias).append('\'');
-            }
-        }
-        else
-        {
-            cUrlBuilder.append(_port);
-        }
+        final List<Integer> copy = new ArrayList<>(_failoverPorts.size() + 1);
+        copy.add(_enableTls ? _sslPort : _port);
 
-        if (_saslMechanisms != null)
+        if (_enableFailover)
         {
-            if (_enableTls)
+            if (_failoverPorts.isEmpty())
             {
-                cUrlBuilder.append("&");
+                Integer testPortAlt;
+                if ((testPortAlt = Integer.getInteger("test.port.alt")) != null)
+                {
+                    copy.add(testPortAlt);
+                }
+                else if (_enableTls && (testPortAlt = Integer.getInteger("test.port.alt.ssl")) != null)
+                {
+                    copy.add(testPortAlt);
+                }
             }
             else
             {
-                cUrlBuilder.append("?");
+                copy.addAll(_failoverPorts);
             }
-            cUrlBuilder.append("sasl_mechs='").append(_saslMechanisms).append('\'');
         }
 
+        final String transportQuery = buildTransportQuery();
+        final String brokerlist = copy.stream()
+                                    .map(port -> String.format("tcp://%s:%d%s", _host, port, transportQuery))
+                                    .collect(Collectors.joining(";", "brokerlist='", "'"));
+        cUrlBuilder.append(brokerlist);
+
         if (_enableFailover)
         {
-            cUrlBuilder.append(";tcp://").append(_host).append(":");
-            if (_enableTls)
-            {
-                cUrlBuilder.append(System.getProperty("test.port.alt.ssl")).append("?ssl='true'");
-            }
-            else
-            {
-                cUrlBuilder.append(System.getProperty("test.port.alt"));
-            }
-            cUrlBuilder.append("'")
-                       .append("&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='")
-                       .append(_reconnectAttempts)
-                       .append("''");
+            cUrlBuilder.append("&sync_ack='true'&sync_publish='all'");
+            cUrlBuilder.append(String.format("&failover='roundrobin?cyclecount='%d''", _reconnectAttempts));
         }
-        else
+
+        if (_saslMechanisms != null)
         {
-            cUrlBuilder.append("'");
+            cUrlBuilder.append("&");
+            cUrlBuilder.append("sasl_mechs='").append(_saslMechanisms).append('\'');
         }
 
         for (Map.Entry<String, Object> entry : _options.entrySet())
@@ -375,6 +369,47 @@ public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
         }
     }
 
+    private String buildTransportQuery()
+    {
+        final StringBuilder builder = new StringBuilder();
+
+        if (_enableTls)
+        {
+            builder.append("?ssl='true'");
+            if (_keyStoreLocation != null)
+            {
+                builder.append("&key_store='").append(encodeBrokerOption(_keyStoreLocation)).append('\'');
+            }
+            if (_keyStorePassword != null)
+            {
+                builder.append("&key_store_password='").append(_keyStorePassword).append('\'');
+            }
+            if (_trustStoreLocation != null)
+            {
+                builder.append("&trust_store='").append(encodeBrokerOption(_trustStoreLocation)).append('\'');
+            }
+            if (_trustStorePassword != null)
+            {
+                builder.append("&trust_store_password='").append(_trustStorePassword).append('\'');
+            }
+            if (_verifyHostName != null)
+            {
+                builder.append("&ssl_verify_hostname='").append(_verifyHostName).append('\'');
+            }
+            if (_keyAlias != null)
+            {
+                builder.append("&ssl_cert_alias='").append(_keyAlias).append('\'');
+            }
+        }
+        if (_connectdelay > 0)
+        {
+            final char initial = builder.length() == 0 ? '?' : '&';
+            builder.append(String.format("%cconnectdelay='%d'", initial, _connectdelay));
+        }
+
+        return builder.toString();
+    }
+
     private String encodeBrokerOption(final String canonicalPath)
     {
         try

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
index 89de60a..733b650 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClient0xProvider.java
@@ -22,9 +22,13 @@ package org.apache.qpid.systests;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.AccessControlException;
 import java.util.Hashtable;
+import java.util.Objects;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -73,7 +77,7 @@ public class QpidJmsClient0xProvider implements JmsProvider
     public Queue createQueue(Session session, String queueName) throws JMSException
     {
 
-        Queue amqQueue = null;
+        Queue amqQueue;
         try
         {
             amqQueue = getTestQueue(queueName);
@@ -148,4 +152,73 @@ public class QpidJmsClient0xProvider implements JmsProvider
     {
         return new QpidJmsClient0xConnectionBuilder();
     }
+
+    @Override
+    public void addGenericConnectionListener(final Connection connection,
+                                             final GenericConnectionListener listener)
+    {
+        try
+        {
+            final Class<?> iface = Class.forName("org.apache.qpid.jms.ConnectionListener");
+            final Object listenerProxy = Proxy.newProxyInstance(iface.getClassLoader(),
+                                                                new Class[]{iface},
+                                                                (proxy, method, args) -> {
+                                                                    final String methodName = method.getName();
+                                                                    switch (methodName)
+                                                                    {
+                                                                        case "preFailover":
+                                                                        {
+                                                                            URI uri = getConnectedURI(connection);
+                                                                            listener.onConnectionInterrupted(uri);
+                                                                            return true;
+                                                                        }
+                                                                        case "preResubscribe":
+                                                                            return true;
+                                                                        case "failoverComplete":
+                                                                        {
+                                                                            URI uri = getConnectedURI(connection);
+                                                                            listener.onConnectionRestored(uri);
+                                                                            break;
+                                                                        }
+                                                                        case "toString":
+                                                                        return String.format("[Proxy %s]",
+                                                                                                 listener.toString());
+                                                                        case "equals":
+                                                                            Object other = args[0];
+                                                                            return Objects.equals(this, other);
+                                                                        case "hashCode":
+                                                                            return Objects.hashCode(this);
+                                                                    }
+                                                                    return null;
+                                                                });
+
+            final Method setConnectionListener = connection.getClass().getMethod("setConnectionListener", iface);
+            setConnectionListener.invoke(connection, listenerProxy);
+        }
+        catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException("Unable to reflectively add listener", e);
+        }
+    }
+
+    @Override
+    public URI getConnectedURI(final Connection connection)
+    {
+        try
+        {
+            final Method brokerDetailsMethod = connection.getClass().getMethod("getActiveBrokerDetails");
+            Object abd =  brokerDetailsMethod.invoke(connection);
+            final Method getHostMethod = abd.getClass().getMethod("getHost");
+            final Method getPortMethod = abd.getClass().getMethod("getPort");
+            final Method getTransportMethod = abd.getClass().getMethod("getTransport");
+            String host = (String) getHostMethod.invoke(abd);
+            int port = (Integer) getPortMethod.invoke(abd);
+            String transport = (String) getTransportMethod.invoke(abd);
+            return URI.create(String.format("%s://%s:%d", transport, host, port));
+        }
+        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException("Unable to reflectively get connected URI", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientConnectionBuilder.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientConnectionBuilder.java
index 76e3a76..263ff9c 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientConnectionBuilder.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientConnectionBuilder.java
@@ -22,10 +22,15 @@ package org.apache.qpid.systests;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -43,6 +48,7 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
     private Map<String, Object> _options;
     private boolean _enableTls;
     private boolean _enableFailover;
+    private final List<Integer> _failoverPorts = new ArrayList<>();
 
     QpidJmsClientConnectionBuilder()
     {
@@ -68,6 +74,13 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
     }
 
     @Override
+    public ConnectionBuilder addFailoverPort(final int port)
+    {
+        _failoverPorts.add(port);
+        return this;
+    }
+
+    @Override
     public ConnectionBuilder setSslPort(final int port)
     {
         _sslPort = port;
@@ -145,6 +158,13 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
     }
 
     @Override
+    public ConnectionBuilder setFailoverReconnectDelay(final int connectDelay)
+    {
+        _options.put("failover.reconnectDelay", connectDelay);
+        return this;
+    }
+
+    @Override
     public ConnectionBuilder setTls(final boolean enableTls)
     {
         _enableTls = enableTls;
@@ -270,13 +290,43 @@ public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
             {
                 options.put("failover.maxReconnectAttempts", "2");
             }
-            connectionUrlBuilder.append("failover:(amqp://")
-                    .append(_host)
-                    .append(":")
-                    .append(_port)
-                    .append(",amqp://localhost:")
-                    .append(System.getProperty("test.port.alt"))
-                    .append(")");
+
+            final Set<String> transportKeys = options.keySet()
+                                                     .stream()
+                                                     .filter(key -> key.startsWith("amqp.") || key.startsWith(
+                                                             "transport."))
+                                                     .collect(Collectors.toSet());
+
+
+            final Map<String, Object> transportOptions = new HashMap<>(options);
+            transportOptions.keySet().retainAll(transportKeys);
+            options.keySet().removeAll(transportKeys);
+
+            final StringBuilder transportQueryBuilder = new StringBuilder();
+            appendOptions(transportOptions, transportQueryBuilder);
+            final String transportQuery = transportQueryBuilder.toString();
+
+            final List<Integer> copy = new ArrayList<>(_failoverPorts.size() + 1);
+            copy.add(_enableTls ? _sslPort : _port);
+
+            if (_failoverPorts.isEmpty())
+            {
+                Integer testPortAlt;
+                if ((testPortAlt = Integer.getInteger("test.port.alt")) != null)
+                {
+                    copy.add(testPortAlt);
+                }
+                else if (_enableTls && (testPortAlt = Integer.getInteger("test.port.alt.ssl")) != null)
+                {
+                    copy.add(testPortAlt);
+                }
+            }
+            copy.addAll(_failoverPorts);
+
+            final String failover = copy.stream()
+                                        .map(port -> String.format("amqp://%s:%d%s", _host, port, transportQuery))
+                                        .collect(Collectors.joining(",", "failover:(", ")"));
+            connectionUrlBuilder.append(failover);
             appendOptions(options, connectionUrlBuilder);
         }
         else if (!_enableTls)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
index d8af7f6..d450e11 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/QpidJmsClientProvider.java
@@ -20,7 +20,12 @@
 
 package org.apache.qpid.systests;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Objects;
 import java.util.Properties;
 
 import javax.jms.Connection;
@@ -120,4 +125,63 @@ public class QpidJmsClientProvider implements JmsProvider
             initialContext.close();
         }
     }
+
+    @Override
+    public void addGenericConnectionListener(final Connection connection, final GenericConnectionListener listener)
+    {
+        try
+        {
+            final Class<?> iface = Class.forName("org.apache.qpid.jms.JmsConnectionListener");
+            final Object listenerProxy = Proxy.newProxyInstance(iface.getClassLoader(),
+                                                                       new Class[]{iface},
+                                                                       (proxy, method, args) -> {
+                                                                           final String methodName = method.getName();
+                                                                           switch (methodName)
+                                                                           {
+                                                                               case "onConnectionRestored":
+                                                                                   listener.onConnectionRestored(
+
+                                                                                           ((URI) args[0]));
+                                                                                   break;
+                                                                               case "onConnectionInterrupted":
+                                                                                   listener.onConnectionInterrupted(
+
+                                                                                           ((URI) args[0]));
+                                                                                   break;
+                                                                               case "toString":
+                                                                                   return String.format("[Proxy %s]",
+                                                                                                        listener.toString());
+                                                                               case "equals":
+                                                                                   Object other = args[0];
+                                                                                   return Objects.equals(this, other);
+                                                                               case "hashCode":
+                                                                                   return Objects.hashCode(this);
+                                                                           }
+                                                                           return null;
+                                                                       });
+
+            final Method addConnectionListener = connection.getClass().getMethod("addConnectionListener", iface);
+            addConnectionListener.invoke(connection, listenerProxy);
+        }
+        catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException("Unable to reflectively add listener", e);
+        }
+    }
+
+    @Override
+    public URI getConnectedURI(final Connection connection)
+    {
+        final Method connectedURI;
+        try
+        {
+            connectedURI = connection.getClass().getMethod("getConnectedURI", new Class[] {});
+            return (URI) connectedURI.invoke(connection, null);
+        }
+        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException("Unable to reflectively get connected URI", e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index fe53a24..654c6a0 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -99,6 +99,7 @@ public class QpidBrokerTestCase extends QpidTestCase
     private AmqpManagementFacade _managementFacade;
     private BrokerHolder _defaultBroker;
     private MessageType _messageType = MessageType.TEXT;
+
     private JmsProvider _jmsProvider;
 
     @Override
@@ -217,6 +218,11 @@ public class QpidBrokerTestCase extends QpidTestCase
         getDefaultBroker().restart();
     }
 
+    public JmsProvider getJmsProvider()
+    {
+        return _jmsProvider;
+    }
+
     public ConnectionBuilder getConnectionBuilder()
     {
         final ConnectionBuilder connectionBuilder = _jmsProvider.getConnectionBuilder()
@@ -571,7 +577,7 @@ public class QpidBrokerTestCase extends QpidTestCase
         sendMessage(session, destination, 1);
         session.commit();
         connection.start();
-        Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+        Message m1 = consumer.receive(getReceiveTimeout());
         assertNotNull("Message 1 is not received", m1);
         assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
         session.commit();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c09a9a31/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index d8ae4d7..ba9a39e 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -82,8 +82,6 @@ org.apache.qpid.systest.rest.acl.*
 
 
 
-org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
-
 org.apache.qpid.server.protocol.v0_8.*
 
 //Qpid Broker-J BDB System Tests


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org