You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/10 08:51:08 UTC
[2/3] qpid-broker-j git commit: QPID-8158: [Broker-J] [System Tests]
Refactor MultiNodeTest and TwoNodeTest to use QpidTestRunner and BrokerAdmin
for running the tests
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index b80f002..ad9df21 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -19,20 +19,33 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.File;
import java.net.URI;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -48,11 +61,11 @@ import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.util.FileUtils;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
@@ -60,120 +73,94 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
import org.apache.qpid.systests.ConnectionBuilder;
import org.apache.qpid.systests.GenericConnectionListener;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.systests.Utils;
+import org.apache.qpid.test.utils.PortHelper;
import org.apache.qpid.test.utils.TestUtils;
+import org.apache.qpid.tests.utils.ConfigItem;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
-public class MultiNodeTest extends QpidBrokerTestCase
+@RunBrokerAdmin(type = "BDB-HA")
+@GroupConfig(numberOfNodes = 3, groupName = "test")
+@ConfigItem(name = Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, value = "false")
+public class MultiNodeTest extends GroupJmsTestBase
{
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeTest.class);
- private static final String VIRTUAL_HOST = "test";
- private static final int NUMBER_OF_NODES = 3;
- private static final int FAILOVER_COMPLETION_TIMEOUT = 60000;
-
- private GroupCreator _groupCreator;
-
- private FailoverAwaitingListener _failoverListener;
-
- /** Used when expectation is client will (re)-connect */
- private ConnectionBuilder _positiveFailoverBuilder;
-
- /** Used when expectation is client will not (re)-connect */
- private ConnectionBuilder _negativeFailoverBuilder;
-
- @Override
- protected void setUp() throws Exception
- {
- assertTrue(isJavaBroker());
- assertTrue(isBrokerStorePersistent());
- _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
- _groupCreator.configureClusterNodes();
+ private FailoverAwaitingListener _failoverListener = new FailoverAwaitingListener();
- _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
- _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2);
-
- _groupCreator.startCluster();
- _failoverListener = new FailoverAwaitingListener();
-
- super.setUp();
- }
-
- @Override
- public void startDefaultBroker() throws Exception
- {
- // Don't start default broker provided by QBTC.
- }
+ private static final int FAILOVER_COMPLETION_TIMEOUT = 60000;
+ @Test
public void testLossOfMasterNodeCausesClientToFailover() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ final int masterPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", masterPort);
- _groupCreator.stopNode(activeBrokerPort);
- LOGGER.info("Node is stopped");
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Listener has finished");
- // any op to ensure connection remains
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ getBrokerAdmin().stopNode(masterPort);
+ LOGGER.info("Node is stopped");
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Listener has finished");
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ finally
+ {
+ connection.close();
+ }
}
+ @Test
public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
-
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
- LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
- _groupCreator.stopNode(inactiveBrokerPort);
+ final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+ LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort);
- _failoverListener.assertNoFailoverCompletionWithin(2000);
+ getBrokerAdmin().stopNode(inactiveBrokerPort);
- // any op to ensure connection remains
- connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
+ _failoverListener.assertNoFailoverCompletionWithin(2000);
- public void testLossOfQuorumCausesClientDisconnection() throws Exception
- {
- if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
- {
- // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted
- // by a graceful connection close from the client side.
- return;
+ // any op to ensure connection remains
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
-
- final Connection connection = _negativeFailoverBuilder.build();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination = session.createQueue(getTestQueueName());
- getJmsProvider().createQueue(session, getTestQueueName());
-
- Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
-
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- ports.remove(activeBrokerPort);
-
- // Stop all other nodes
- for (Integer p : ports)
+ finally
{
- _groupCreator.stopNode(p);
+ connection.close();
}
+ }
+ @Test
+ public void testLossOfQuorumCausesClientDisconnection() throws Exception
+ {
+ final Connection connection = getConnectionBuilder().build();
try
{
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ ports.remove(activeBrokerPort);
+
+ // Stop all other nodes
+ for (Integer p : ports)
+ {
+ getBrokerAdmin().stopNode(p);
+ }
- sendMessage(session, destination, 1);
- fail("Exception not thrown - sending message within a transaction should fail without quorum");
- }
- catch(JMSException jms)
- {
- // PASS
+ _failoverListener.awaitPreFailover(2000);
}
finally
{
@@ -184,7 +171,10 @@ public class MultiNodeTest extends QpidBrokerTestCase
// New connections should now fail as vhost will be unavailable
try
{
- Connection unexpectedConnection = _negativeFailoverBuilder.build();
+ Connection unexpectedConnection = getConnectionBuilder()
+ .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+ .setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+ .build();
fail("Got unexpected connection to node in group without quorum " + unexpectedConnection);
}
catch (JMSException je)
@@ -198,442 +188,490 @@ public class MultiNodeTest extends QpidBrokerTestCase
* test ensures that open messaging transactions are correctly rolled-back as quorum is lost,
* and later the node rejoins the group in either master or replica role.
*/
+ @Test
public void testQuorumLostAndRestored_OriginalMasterRejoinsTheGroup() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ Destination dest = createTestQueue(connection);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- Destination dest = session.createQueue(getTestQueueName());
- session.close();
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
- Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ ports.remove(activeBrokerPort);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- ports.remove(activeBrokerPort);
+ Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
- Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
- Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ session1.createConsumer(dest).close();
- session1.createConsumer(dest).close();
+ MessageProducer producer1 = session1.createProducer(dest);
+ producer1.send(session1.createMessage());
+ MessageProducer producer2 = session2.createProducer(dest);
+ producer2.send(session2.createMessage());
- MessageProducer producer1 = session1.createProducer(dest);
- producer1.send(session1.createMessage());
- MessageProducer producer2 = session2.createProducer(dest);
- producer2.send(session2.createMessage());
+ // Leave transactions open, this will leave two store transactions open on the store
- // Leave transactions open, this will leave two store transactions open on the store
+ // Stop all other nodes
+ for (Integer p : ports)
+ {
+ getBrokerAdmin().stopNode(p);
+ }
- // Stop all other nodes
- for (Integer p : ports)
- {
- _groupCreator.stopNode(p);
- }
+ // Await the old master discovering that it is all alone
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "WAITING");
- // Await the old master discovering that it is all alone
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "WAITING");
+ // Restart all other nodes
+ for (Integer p : ports)
+ {
+ getBrokerAdmin().startNode(p);
+ }
- // Restart all other nodes
- for (Integer p : ports)
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "MASTER", "REPLICA");
+ }
+ finally
{
- _groupCreator.startNode(p);
+ connection.close();
}
-
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA");
}
+ @Test
public void testPersistentMessagesAvailableAfterFailover() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ Destination queue = createTestQueue(connection);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- Destination queue = session.createQueue(getTestQueueName());
- session.close();
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Utils.sendMessages(producingSession, queue, 10);
- Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- sendMessage(producingSession, queue, 10);
+ getBrokerAdmin().stopNode(activeBrokerPort);
+ LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort);
- _groupCreator.stopNode(activeBrokerPort);
- LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort);
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Failover has finished");
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Failover has finished");
+ final int activeBrokerPortAfterFailover = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover);
- final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover);
+ Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumingSession.createConsumer(queue);
- Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumingSession.createConsumer(queue);
-
- connection.start();
- for(int i = 0; i < 10; i++)
+ connection.start();
+ for (int i = 0; i < 10; i++)
+ {
+ Message m = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message " + i + " is not received", m);
+ assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
+ }
+ consumingSession.commit();
+ }
+ finally
{
- Message m = consumer.receive(getReceiveTimeout());
- assertNotNull("Message " + i + " is not received", m);
- assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
+ connection.close();
}
- consumingSession.commit();
}
+ @Test
public void testTransferMasterFromLocalNode() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ Destination queue = createTestQueue(connection);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
- final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
- LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+ final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+ LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
- // transfer mastership 3 times in order to verify
- // that repeated mastership transfer to the same node works, See QPID-6996
- transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort);
- transferMasterFromLocalNode(connection, activeBrokerPort, inactiveBrokerPort);
- transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort);
+ // transfer mastership 3 times in order to verify
+ // that repeated mastership transfer to the same node works, See QPID-6996
+ transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+ transferMasterFromLocalNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+ transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+ }
+ finally
+ {
+ connection.close();
+ }
}
private void transferMasterFromLocalNode(final Connection connection,
+ final Destination queue,
final int inactiveBrokerPort,
final int activeBrokerPort) throws Exception
{
_failoverListener = new FailoverAwaitingListener();
getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
- _groupCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+ getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+ Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
_failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
LOGGER.info("Listener has finished");
- attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
- assertProducingConsuming(connection);
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
}
+ @Test
public void testTransferMasterFromRemoteNode() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
-
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
-
- final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
- LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
-
- // transfer mastership 3 times in order to verify
- // that repeated mastership transfer to the same node works, See QPID-6996
- transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort);
- transferMasterFromRemoteNode(connection, inactiveBrokerPort, activeBrokerPort);
- transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ Destination queue = createTestQueue(connection);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
+
+ final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+ LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+
+ // transfer mastership 3 times in order to verify
+ // that repeated mastership transfer to the same node works, See QPID-6996
+ transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+ transferMasterFromRemoteNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+ transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+ }
+ finally
+ {
+ connection.close();
+ }
}
private void transferMasterFromRemoteNode(final Connection connection,
+ final Destination queue,
final int activeBrokerPort,
final int inactiveBrokerPort) throws Exception
{
_failoverListener = new FailoverAwaitingListener();
getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+ getBrokerAdmin().awaitRemoteNodeRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = getBrokerAdmin().getRemoteNodeAttributes(activeBrokerPort, inactiveBrokerPort);
assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
- _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+ getBrokerAdmin().setRemoteNodeAttributes(activeBrokerPort,
+ inactiveBrokerPort,
+ Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
_failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
LOGGER.info("Listener has finished");
- attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
- assertProducingConsuming(connection);
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
}
+
+ @Test
public void testTransferMasterWhilstMessagesInFlight() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- final Destination destination = session.createQueue(getTestQueueName());
-
- final AtomicBoolean masterTransferred = new AtomicBoolean(false);
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
- final AtomicReference<Exception> workerException = new AtomicReference<>();
- final CountDownLatch producedOneBefore = new CountDownLatch(1);
- final CountDownLatch producedOneAfter = new CountDownLatch(1);
- final CountDownLatch workerShutdown = new CountDownLatch(1);
-
- Runnable producer = () -> {
- try
- {
- int count = 0;
- MessageProducer producer1 = session.createProducer(destination);
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+
+ final Destination destination = createTestQueue(connection);
- while (keepRunning.get())
+ final AtomicBoolean masterTransferred = new AtomicBoolean(false);
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final AtomicReference<Exception> workerException = new AtomicReference<>();
+ final CountDownLatch producedOneBefore = new CountDownLatch(1);
+ final CountDownLatch producedOneAfter = new CountDownLatch(1);
+ final CountDownLatch workerShutdown = new CountDownLatch(1);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Runnable producer = () -> {
+ try
{
- String messageText = "message" + count;
- try
+ int count = 0;
+ MessageProducer producer1 = session.createProducer(destination);
+
+ while (keepRunning.get())
{
- Message message = session.createTextMessage(messageText);
- producer1.send(message);
- session.commit();
- LOGGER.debug("Sent message " + count);
+ String messageText = "message" + count;
+ try
+ {
+ Message message = session.createTextMessage(messageText);
+ producer1.send(message);
+ session.commit();
+ LOGGER.debug("Sent message " + count);
- producedOneBefore.countDown();
+ producedOneBefore.countDown();
- if (masterTransferred.get())
+ if (masterTransferred.get())
+ {
+ producedOneAfter.countDown();
+ }
+ count++;
+ }
+ catch (javax.jms.IllegalStateException ise)
{
- producedOneAfter.countDown();
+ throw ise;
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Pass - failover in prgoress
+ }
+ catch (JMSException je)
+ {
+ // Pass - failover in progress
}
- count++;
- }
- catch (javax.jms.IllegalStateException ise)
- {
- throw ise;
- }
- catch (TransactionRolledBackException trbe)
- {
- // Pass - failover in prgoress
- }
- catch(JMSException je)
- {
- // Pass - failover in progress
}
}
- }
- catch (Exception e)
- {
- workerException.set(e);
- }
- finally
- {
- workerShutdown.countDown();
- }
- };
+ catch (Exception e)
+ {
+ workerException.set(e);
+ }
+ finally
+ {
+ workerShutdown.countDown();
+ }
+ };
- Thread backgroundWorker = new Thread(producer);
- backgroundWorker.start();
+ Thread backgroundWorker = new Thread(producer);
+ backgroundWorker.start();
- boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
- assertTrue(workerRunning);
+ boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue(workerRunning);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
- final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
- LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+ final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+ LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
- assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+ getBrokerAdmin().awaitNodeRole(inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
- _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+ getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+ Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Failover has finished");
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Failover has finished");
- attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
- assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+ attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+ assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
- _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+ getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
- LOGGER.info("Master transfer known to have completed successfully.");
- masterTransferred.set(true);
+ LOGGER.info("Master transfer known to have completed successfully.");
+ masterTransferred.set(true);
- boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
- assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
+ boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
- keepRunning.set(false);
- boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
- assertTrue("Worker thread should have shutdown", shutdown);
+ keepRunning.set(false);
+ boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Worker thread should have shutdown", shutdown);
- backgroundWorker.join(5000);
- assertNull(workerException.get());
+ backgroundWorker.join(5000);
+ assertThat(workerException.get(), is(nullValue()));
- assertNotNull(session.createTemporaryQueue());
+ assertNotNull(session.createTemporaryQueue());
+ }
+ finally
+ {
+ connection.close();
+ }
}
+ @Test
public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception
{
- if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
- {
- // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted
- // by a graceful connection close from the client side.
- return;
- }
- int connectionNumber = Integer.getInteger("MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections", 20);
- ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES - 1);
+ int connectionNumber = Integer.getInteger(
+ "MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections",
+ 20);
+ ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + 2);
try
{
- final ConnectionBuilder consumerBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100);
- final Connection consumerConnection = consumerBuilder.build();
- Session s = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(s, getTestQueueName());
- s.close();
+ final ConnectionBuilder connectionBuilder =
+ getConnectionBuilder().setFailoverReconnectDelay(100).setFailoverReconnectAttempts(100);
+ final Connection consumerConnection = connectionBuilder.build();
+ try
+ {
+ Destination destination = createTestQueue(consumerConnection);
+ consumerConnection.start();
- consumerConnection.start();
+ final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumerSession.createConsumer(destination).setMessageListener(message -> {
+ try
+ {
+ LOGGER.info("Message received: " + ((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Failure to get message text", e);
+ }
+ });
- final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Destination destination = consumerSession.createQueue(getTestQueueName());
- consumerSession.createConsumer(destination).setMessageListener(message -> {
- try
+ final Connection[] connections = new Connection[connectionNumber];
+ final Session[] sessions = new Session[connectionNumber];
+ for (int i = 0; i < sessions.length; i++)
{
- LOGGER.info("Message received: " + ((TextMessage) message).getText());
+ connections[i] = connectionBuilder.setClientId("test-" + UUID.randomUUID()).build();
+ sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
+ LOGGER.info("Session {} is created", i);
}
- catch (JMSException e)
+ try
{
- LOGGER.error("Failure to get message text", e);
- }
- });
-
- final Connection[] connections = new Connection[connectionNumber];
- final Session[] sessions = new Session[connectionNumber];
- for (int i = 0; i < sessions.length; i++)
- {
- final ConnectionBuilder builder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100);
- connections[i] = builder.build();
- sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
- LOGGER.info("Session {} is created", i);
- }
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
- List<Integer> ports = new ArrayList<>(_groupCreator.getBrokerPortNumbersForNodes());
+ int maxMessageSize = 10;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < maxMessageSize - 2; i++)
+ {
+ sb.append("X");
+ }
+ String messageText = sb.toString();
+ for (int n = 0; n < 3; n++)
+ {
+ LOGGER.info("Starting iteration {}", n);
- int maxMessageSize = 10;
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < maxMessageSize - 2; i++)
- {
- sb.append("X");
- }
- String messageText = sb.toString();
- for (int n = 0; n < NUMBER_OF_NODES; n++)
- {
- LOGGER.info("Starting iteration {}", n);
+ FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
- FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
+ for (int i = 0; i < sessions.length; i++)
+ {
+ Connection connection = connections[i];
+ getJmsProvider().addGenericConnectionListener(connection, failoverListener);
- for (int i = 0; i < sessions.length; i++)
- {
- Connection connection = connections[i];
- getJmsProvider().addGenericConnectionListener(connection, failoverListener);
+ MessageProducer producer = sessions[i].createProducer(destination);
+ Message message = sessions[i].createTextMessage(messageText + "-" + i);
+ producer.send(message);
+ }
- MessageProducer producer = sessions[i].createProducer(destination);
- Message message = sessions[i].createTextMessage(messageText + "-" + i);
- producer.send(message);
- }
+ LOGGER.info("All publishing sessions have uncommitted transactions");
- LOGGER.info("All publishing sessions have uncommitted transactions");
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connections[0]).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ List<Integer> inactivePorts = new ArrayList<>(ports);
+ inactivePorts.remove(new Integer(activeBrokerPort));
- List<Integer> inactivePorts = new ArrayList<>(ports);
- inactivePorts.remove(new Integer(activeBrokerPort));
+ final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
+ for (int port : inactivePorts)
+ {
+ final int inactiveBrokerPort = port;
+ LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+
+ executorService.submit(() -> {
+ try
+ {
+ getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+ Collections.singletonMap(
+ BDBHAVirtualHostNode.DESIRED_STATE,
+ State.STOPPED.name()));
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ });
+ }
- final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
- for (int port : inactivePorts)
- {
- final int inactiveBrokerPort = port;
- LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+ latch.await(500, TimeUnit.MILLISECONDS);
- executorService.submit(() -> {
- try
+ LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
+ for (final Session session : sessions)
{
- _groupCreator.setNodeAttributes(inactiveBrokerPort,
- inactiveBrokerPort,
- Collections.singletonMap(
- BDBHAVirtualHostNode.DESIRED_STATE,
- State.STOPPED.name()));
+ executorService.submit(() -> {
+ try
+ {
+ session.commit();
+ }
+ catch (JMSException e)
+ {
+ // majority of commits might fail due to insufficient replicas
+ }
+ });
}
- catch (Exception e)
+
+ LOGGER.info("Verify that stopped nodes are in detached role");
+ for (int port : inactivePorts)
{
- LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e);
+ getBrokerAdmin().awaitNodeRole(port, NodeRole.DETACHED.name());
}
- finally
+
+ LOGGER.info("Start stopped nodes");
+ for (int port : inactivePorts)
{
- latch.countDown();
+ LOGGER.info("Starting node for inactive broker on port " + port);
+ try
+ {
+ getBrokerAdmin().setNodeAttributes(port,
+ Collections.singletonMap(
+ BDBHAVirtualHostNode.DESIRED_STATE,
+ State.ACTIVE.name()));
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to start node on broker with port " + port, e);
+ }
}
- });
- }
- latch.await(500, TimeUnit.MILLISECONDS);
+ for (int port : ports)
+ {
+ getBrokerAdmin().awaitNodeRole(port, "REPLICA", "MASTER");
+ }
- LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
- for (final Session session : sessions)
- {
- executorService.submit(() -> {
- try
+ if (failoverListener.isFailoverStarted())
{
- session.commit();
+ LOGGER.info("Waiting for failover completion");
+ failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
+ LOGGER.info("Failover has finished");
}
- catch (JMSException e)
+ else
{
- // majority of commits might fail due to insufficient replicas
+ LOGGER.info("Failover never started");
}
- });
- }
-
- LOGGER.info("Verify that stopped nodes are in detached role");
- for (int port : inactivePorts)
- {
- _groupCreator.awaitNodeToAttainRole(port, NodeRole.DETACHED.name());
+ }
}
-
- LOGGER.info("Start stopped nodes");
- for (int port : inactivePorts)
+ finally
{
- LOGGER.info("Starting node for inactive broker on port " + port);
- try
- {
- _groupCreator.setNodeAttributes(port,
- port,
- Collections.singletonMap(
- BDBHAVirtualHostNode.DESIRED_STATE,
- State.ACTIVE.name()));
- }
- catch (Exception e)
+ for (Connection c: connections)
{
- LOGGER.error("Failed to start node on broker with port " + port, e);
+ try
+ {
+ c.close();
+ }
+ finally
+ {
+ LOGGER.error("Unexpected exception on connection close");
+ }
}
}
-
- for (int port : ports)
- {
- _groupCreator.awaitNodeToAttainRole(port, "REPLICA", "MASTER");
- }
-
- if (failoverListener.isFailoverStarted())
- {
- LOGGER.info("Waiting for failover completion");
- failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
- LOGGER.info("Failover has finished");
- }
- else
- {
- LOGGER.info("Failover never started");
- }
+ }
+ finally
+ {
+ consumerConnection.close();
}
}
finally
@@ -646,127 +684,148 @@ public class MultiNodeTest extends QpidBrokerTestCase
* Tests aims to demonstrate that in a disaster situation (where all nodes except the master is lost), that operation
* can be continued from a single node using the QUORUM_OVERRIDE feature.
*/
+ @Test
public void testQuorumOverride() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ Destination queue = createTestQueue(connection);
- Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- ports.remove(activeBrokerPort);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ ports.remove(activeBrokerPort);
- // Stop all other nodes
- for (Integer p : ports)
- {
- _groupCreator.stopNode(p);
- }
+ // Stop all other nodes
+ for (Integer p : ports)
+ {
+ getBrokerAdmin().stopNode(p);
+ }
- LOGGER.info("Awaiting failover to start");
- _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Failover has begun");
+ LOGGER.info("Awaiting failover to start");
+ _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Failover has begun");
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
- assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
- _groupCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
+ Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
+ assertEquals("Broker has unexpected quorum override",
+ new Integer(0),
+ attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+ getBrokerAdmin().setNodeAttributes(activeBrokerPort,
+ Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
- attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
- assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+ attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
+ assertEquals("Broker has unexpected quorum override",
+ new Integer(1),
+ attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Failover has finished");
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Failover has finished");
- assertProducingConsuming(connection);
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+ }
+ finally
+ {
+ connection.close();
+ }
}
+ @Test
public void testPriority() throws Exception
{
- final Connection connection = _positiveFailoverBuilder.build();
- getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
+ final Connection connection = getConnectionBuilder().build();
+ try
+ {
+ getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+ Destination queue = createTestQueue(connection);
- final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
- LOGGER.info("Active connection port {}", activeBrokerPort);
+ final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+ LOGGER.info("Active connection port {}", activeBrokerPort);
- int priority = 1;
- Integer highestPriorityBrokerPort = null;
- Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
- for (Integer port : ports)
- {
- if (activeBrokerPort != port)
+ int priority = 1;
+ Integer highestPriorityBrokerPort = null;
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+ for (Integer port : ports)
{
- priority = priority + 1;
- highestPriorityBrokerPort = port;
- _groupCreator.setNodeAttributes(port, port, Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, priority));
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port);
- assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY));
+ if (activeBrokerPort != port)
+ {
+ priority = priority + 1;
+ highestPriorityBrokerPort = port;
+ getBrokerAdmin().setNodeAttributes(port,
+ Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY,
+ priority));
+ Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(port);
+ assertEquals("Broker has unexpected priority",
+ priority,
+ attributes.get(BDBHAVirtualHostNode.PRIORITY));
+ }
}
- }
- LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority);
+ LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority);
- // make sure all remote nodes are materialized on the master
- // in order to make sure that DBPing is not invoked
- for (Integer port : ports)
- {
- if (activeBrokerPort != port)
+ for (Integer port : ports)
{
- _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort, port, BDBHARemoteReplicationNode.ROLE, "REPLICA");
+ if (activeBrokerPort != port)
+ {
+ getBrokerAdmin().awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA");
+ }
}
- }
- // do work on master
- assertProducingConsuming(connection);
+ // do work on master
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
- Map<String, Object> masterNodeAttributes = _groupCreator.getNodeAttributes(activeBrokerPort);
+ Map<String, Object> masterNodeAttributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
- Object lastTransactionId = masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
- assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number);
+ Object lastTransactionId =
+ masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+ assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number);
- // make sure all remote nodes have the same transaction id as master
- for (Integer port : ports)
- {
- if (activeBrokerPort != port)
+ // make sure all remote nodes have the same transaction id as master
+ for (Integer port : ports)
{
- _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort,
- port,
- BDBHARemoteReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID,
- String.valueOf(lastTransactionId));
+ if (activeBrokerPort != port)
+ {
+ getBrokerAdmin().awaitNodeToAttainAttributeValue(activeBrokerPort,
+ BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID,
+ lastTransactionId);
+ }
}
- }
- LOGGER.info("Shutting down the MASTER");
- _groupCreator.stopNode(activeBrokerPort);
+ LOGGER.info("Shutting down the MASTER");
+ getBrokerAdmin().stopNode(activeBrokerPort);
- _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
- LOGGER.info("Listener has finished");
+ _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+ LOGGER.info("Listener has finished");
- Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort);
- assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+ Map<String, Object> attributes =
+ getBrokerAdmin().getNodeAttributes(highestPriorityBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
- assertProducingConsuming(connection);
+ assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+ }
+ finally
+ {
+ connection.close();
+ }
}
+ @Test
public void testClusterCannotStartWithIntruder() throws Exception
{
- //set property explicitly as test requires broker to start to enable check for ERRORED nodes
- setSystemProperty(Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, String.valueOf(Boolean.FALSE));
-
- int intruderPort = getNextAvailable(Collections.max(_groupCreator.getBdbPortNumbers()) + 1);
+ int intruderPort =
+ new PortHelper().getNextAvailable(Arrays.stream(getBrokerAdmin().getBdbPorts()).max().getAsInt() + 1);
String nodeName = "intruder";
- String nodeHostPort = _groupCreator.getIpAddressOfBrokerHost() + ":" + intruderPort;
+ String nodeHostPort = getBrokerAdmin().getHost() + ":" + intruderPort;
File environmentPathFile = Files.createTempDirectory("qpid-work-intruder").toFile();
try
{
environmentPathFile.mkdirs();
ReplicationConfig replicationConfig =
- new ReplicationConfig(_groupCreator.getGroupName(), nodeName, nodeHostPort);
- replicationConfig.setHelperHosts(_groupCreator.getHelperHostPort());
+ new ReplicationConfig("test", nodeName, nodeHostPort);
+ replicationConfig.setHelperHosts(getBrokerAdmin().getHelperHostPort());
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
@@ -775,7 +834,9 @@ public class MultiNodeTest extends QpidBrokerTestCase
Durability.ReplicaAckPolicy.SIMPLE_MAJORITY));
final String currentThreadName = Thread.currentThread().getName();
- try(ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig))
+ try (ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile,
+ replicationConfig,
+ envConfig))
{
LOGGER.debug("Intruder started");
}
@@ -784,23 +845,24 @@ public class MultiNodeTest extends QpidBrokerTestCase
Thread.currentThread().setName(currentThreadName);
}
- for (int port : _groupCreator.getBrokerPortNumbersForNodes())
+ Set<Integer> ports =
+ Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+ for (int port : ports)
{
- _groupCreator.awaitNodeToAttainAttributeValue(port,
- port,
- BDBHAVirtualHostNode.STATE,
- State.ERRORED.name());
+ getBrokerAdmin().awaitNodeToAttainAttributeValue(port,
+ BDBHAVirtualHostNode.STATE,
+ State.ERRORED.name());
}
- _groupCreator.stopCluster();
- _groupCreator.startCluster();
+ getBrokerAdmin().stop();
+ getBrokerAdmin().start(false);
- for (int port : _groupCreator.getBrokerPortNumbersForNodes())
+ for (int port : ports)
{
- _groupCreator.awaitNodeToAttainAttributeValue(port,
- port,
- BDBHAVirtualHostNode.STATE,
- State.ERRORED.name());
+ getBrokerAdmin().awaitNodeToAttainAttributeValue(port,
+
+ BDBHAVirtualHostNode.STATE,
+ State.ERRORED.name());
}
}
finally
@@ -844,10 +906,10 @@ public class MultiNodeTest extends QpidBrokerTestCase
if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
{
LOGGER.warn("Failover did not occur, dumping threads:\n\n" + TestUtils.dumpThreads() + "\n");
- Map<Integer,String> threadDumps = _groupCreator.groupThreadumps();
- for (Map.Entry<Integer,String> entry : threadDumps.entrySet())
+ Map<Integer, String> threadDumps = getBrokerAdmin().groupThreadDumps();
+ for (Map.Entry<Integer, String> entry : threadDumps.entrySet())
{
- LOGGER.warn("Broker {} thread dump:\n\n {}" , entry.getKey(), entry.getValue());
+ LOGGER.warn("Broker {} thread dump:\n\n {}", entry.getKey(), entry.getValue());
}
}
assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
@@ -870,5 +932,4 @@ public class MultiNodeTest extends QpidBrokerTestCase
return _failoverStarted;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
index 3e0783d..151592c 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
@@ -19,157 +19,151 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import java.util.Collections;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Queue;
+
+import org.junit.Test;
-import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
-import org.apache.qpid.systests.ConnectionBuilder;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.systests.Utils;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
-public class TwoNodeTest extends QpidBrokerTestCase
+@RunBrokerAdmin(type = "BDB-HA")
+@GroupConfig(numberOfNodes = 2, groupName = "test")
+public class TwoNodeTest extends GroupJmsTestBase
{
- private static final String VIRTUAL_HOST = "test";
-
- private static final int NUMBER_OF_NODES = 2;
-
- private GroupCreator _groupCreator;
- /** Used when expectation is client will not (re)-connect */
- private ConnectionBuilder _positiveFailoverBuilder;
-
- /** Used when expectation is client will not (re)-connect */
- private ConnectionBuilder _negativeFailoverBuilder;
-
- @Override
- protected void setUp() throws Exception
+ @Test
+ public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
{
- assertTrue(isJavaBroker());
- assertTrue(isBrokerStorePersistent());
-
- super.setUp();
-
- _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
- }
+ final Connection initialConnection = getConnectionBuilder().build();
+ int masterPort;
+ Queue queue;
+ try
+ {
+ queue = createTestQueue(initialConnection);
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ }
+ finally
+ {
+ initialConnection.close();
+ }
+ getBrokerAdmin().stop();
+ getBrokerAdmin().startNode(masterPort);
- @Override
- public void startDefaultBroker() throws Exception
- {
- // Don't start default broker provided by QBTC.
+ assertProduceConsume(queue);
}
- private void startCluster(boolean designedPrimary) throws Exception
+ @Test
+ public void testClusterRestartWithoutDesignatedPrimary() throws Exception
{
- _groupCreator.configureClusterNodes();
- _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
- _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
- _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2);
- _groupCreator.startCluster();
- }
+ Queue queue;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
+ {
+ queue = createTestQueue(initialConnection);
+ assertThat(Utils.produceConsume(initialConnection, queue), is(equalTo(true)));
+ }
+ finally
+ {
+ initialConnection.close();
+ }
- public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
- {
- startCluster(true);
-
- final Connection initialConnection = _positiveFailoverBuilder.build();
- Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
-
- int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection);
- assertProducingConsuming(initialConnection);
- initialConnection.close();
- _groupCreator.stopCluster();
- _groupCreator.startNode(masterPort);
- final Connection secondConnection = _positiveFailoverBuilder.build();
- assertProducingConsuming(secondConnection);
- secondConnection.close();
- }
+ getBrokerAdmin().stop();
+ getBrokerAdmin().start();
- public void testClusterRestartWithoutDesignatedPrimary() throws Exception
- {
- startCluster(false);
-
- final Connection initialConnection = _positiveFailoverBuilder.build();
- Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
-
- assertProducingConsuming(initialConnection);
- initialConnection.close();
- _groupCreator.stopCluster();
- _groupCreator.startClusterParallel();
- final Connection secondConnection = _positiveFailoverBuilder.build();
- assertProducingConsuming(secondConnection);
- secondConnection.close();
+ assertProduceConsume(queue);
}
+ @Test
public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
{
- startCluster(true);
- _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ int masterPort;
+ Queue queue;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
+ {
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ queue = createTestQueue(initialConnection);
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ }
+ finally
+ {
+ initialConnection.close();
+ }
- final Connection connection = _positiveFailoverBuilder.build();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- session.close();
+ int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
+ getBrokerAdmin().stopNode(replicaPort);
- assertNotNull("Expected to get a valid connection to primary", connection);
- assertProducingConsuming(connection);
+ assertProduceConsume(queue);
}
+ @Test
public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception
{
- if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+ int masterPort;
+ Queue queue ;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
{
- // TODO - there seems to be a client defect when a JMS operation is interrupted
- // by a graceful connection close from the client side.
- return;
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ queue = createTestQueue(initialConnection);
+ }
+ finally
+ {
+ initialConnection.close();
}
- startCluster(false);
-
- final Connection initialConnection = _negativeFailoverBuilder.build();
- Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- initialConnection.close();
+ int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
- _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ getBrokerAdmin().stopNode(replicaPort);
try
{
-
- Connection connection = _negativeFailoverBuilder.build();
- assertProducingConsuming(connection);
+ Connection connection = getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+ .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+ .build();
+ Utils.produceConsume(connection, queue);
fail("Exception not thrown");
}
catch(JMSException e)
{
- // JMSException should be thrown either on getConnection, or produce/consume
+ // JMSException should be thrown either on connection open, or produce/consume
// depending on whether the relative timing of the node discovering that the
// secondary has gone.
}
}
+ @Test
public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
{
- if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+ int masterPort;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
{
- // TODO - there seems to be a client defect when a JMS operation is interrupted
- // by a graceful connection close from the client side.
- return;
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ }
+ finally
+ {
+ initialConnection.close();
}
- startCluster(true);
- _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
-
+ getBrokerAdmin().stopNode(masterPort);
try
{
- _negativeFailoverBuilder.build();
+ getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+ .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+ .build();
fail("Connection not expected");
}
catch (JMSException e)
@@ -178,70 +172,92 @@ public class TwoNodeTest extends QpidBrokerTestCase
}
}
+ @Test
public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
{
- startCluster(true);
+ int masterPort;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
+ {
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ }
+ finally
+ {
+ initialConnection.close();
+ }
+
+ Map<String, Object>
+ primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort);
+ assertThat("Expected primary node to be set as designated primary",
+ primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(true)));
- Map<String, Object> primaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
- assertTrue("Expected primary node to be set as designated primary",
- (Boolean) primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+ int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
- Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
- assertFalse("Expected secondary node to NOT be set as designated primary",
- (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+ Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort);
+ assertThat("Expected secondary node to NOT be set as designated primary",
+ secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
}
+ @Test
public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception
{
- startCluster(true);
-
- final Connection initialConnection = _positiveFailoverBuilder.build();
- Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- initialConnection.close();
+ int masterPort;
+ Queue queue;
+ final Connection initialConnection = getConnectionBuilder().build();
+ try
+ {
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ queue = createTestQueue(initialConnection);
+ }
+ finally
+ {
+ initialConnection.close();
+ }
- _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
+ getBrokerAdmin().stopNode(masterPort);
+ int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
- Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
- assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+ Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort);
+ assertThat("Expected secondary node to NOT be set as designated primary",
+ secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
- _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
+ getBrokerAdmin().setDesignatedPrimary(replicaPort, true);
+ getBrokerAdmin().awaitNodeRole(replicaPort, "MASTER");
- int timeout = 5000;
- long limit = System.currentTimeMillis() + timeout;
- while( !((Boolean)secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)) && System.currentTimeMillis() < limit)
- {
- Thread.sleep(100);
- secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
- }
- assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
-
- final Connection connection = _positiveFailoverBuilder.build();
- assertNotNull("Expected to get a valid connection to new primary", connection);
- assertProducingConsuming(connection);
+ assertProduceConsume(queue);
}
+ @Test
public void testSetDesignatedAfterReplicaBeingStopped() throws Exception
{
- startCluster(false);
-
- final Connection initialConnection = _positiveFailoverBuilder.build();
- Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
- getJmsProvider().createQueue(session, getTestQueueName());
- initialConnection.close();
+ final Connection initialConnection = getConnectionBuilder().build();
+ int masterPort;
+ Queue queue;
+ try
+ {
+ masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+ queue = createTestQueue(initialConnection);
+ }
+ finally
+ {
+ initialConnection.close();
+ }
- _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+ int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
+ getBrokerAdmin().stopNode(replicaPort);
- Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
- assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+ Map<String, Object>
+ primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort);
+ assertThat("Expected node to NOT be set as designated primary",
+ primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
- _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
- _groupCreator.awaitNodeToAttainRole(_groupCreator.getBrokerPortNumberOfPrimary(), "MASTER" );
+ getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+ getBrokerAdmin().awaitNodeRole(masterPort, "MASTER");
- final Connection connection = _positiveFailoverBuilder.build();
- assertNotNull("Expected to get a valid connection to primary", connection);
- assertProducingConsuming(connection);
+ assertProduceConsume(queue);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2e8d623..7cd016a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,7 @@
<module>systests/protocol-tests-amqp-0-8</module>
<module>systests/protocol-tests-amqp-0-10</module>
<module>systests/protocol-tests-amqp-1-0</module>
+ <module>systests/qpid-systests-spawn-admin</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
<module>qpid-perftests-systests</module>
@@ -453,6 +454,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-spawn-admin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
index a41ee39..91255ef 100644
--- a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
+++ b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
@@ -20,6 +20,8 @@
package org.apache.qpid.tests.http;
+import static org.apache.qpid.systests.Utils.getJmsProvider;
+
import java.net.InetSocketAddress;
import javax.jms.Connection;
@@ -32,11 +34,9 @@ import org.junit.Rule;
import org.junit.rules.TestName;
import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.systests.AmqpManagementFacade;
import org.apache.qpid.systests.ConnectionBuilder;
import org.apache.qpid.systests.JmsProvider;
-import org.apache.qpid.systests.QpidJmsClient0xProvider;
-import org.apache.qpid.systests.QpidJmsClientProvider;
+import org.apache.qpid.systests.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -59,17 +59,7 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase
_helper = new HttpTestHelper(getBrokerAdmin(),
config != null && config.useVirtualHostAsHost() ? getVirtualHost() : null);
- Protocol protocol = getProtocol();
- AmqpManagementFacade managementFacade = new AmqpManagementFacade(protocol);
- if (protocol == Protocol.AMQP_1_0)
- {
- _jmsProvider = new QpidJmsClientProvider(managementFacade);
- }
- else
- {
- _jmsProvider = new QpidJmsClient0xProvider();
- }
-
+ _jmsProvider = getJmsProvider();
}
@After
@@ -121,14 +111,12 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase
protected static long getReceiveTimeout()
{
- return Long.getLong("qpid.test_receive_timeout", 1000L);
+ return Utils.getReceiveTimeout();
}
protected static Protocol getProtocol()
{
- return Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "0-9-1")
- .replace('-', '_')
- .replace('.', '_'));
+ return Utils.getProtocol();
}
protected String getTestName()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
index 647d34a..5ac57cf 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
@@ -492,10 +492,12 @@ public class AmqpManagementFacade
return new HashMap<>(bodyMap);
}
}
- throw new IllegalArgumentException("Management read failed : "
- + response.getStringProperty("statusCode")
- + " - "
- + response.getStringProperty("statusDescription"));
+ throw new AmqpManagementFacade.OperationUnsuccessfulException("Management read failed : "
+ + response.getStringProperty("statusCode")
+ + " - "
+ + response.getStringProperty(
+ "statusDescription"),
+ response.getIntProperty("statusCode"));
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org