You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/01/25 12:24:37 UTC
svn commit: r1726609 - in
/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication:
GroupCreator.java MultiNodeTest.java
Author: orudyy
Date: Mon Jan 25 11:24:37 2016
New Revision: 1726609
URL: http://svn.apache.org/viewvc?rev=1726609&view=rev
Log:
QPID-6999: Add a system test to verify broker behaviour on failures in transaction commits after majority being lost
Modified:
qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java?rev=1726609&r1=1726608&r2=1726609&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java Mon Jan 25 11:24:37 2016
@@ -25,6 +25,7 @@ import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -437,32 +438,34 @@ public class GroupCreator
}
}
- public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
+ public void awaitNodeToAttainRole(int brokerPort, String... desiredRole) throws Exception
{
awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole);
}
- public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception
+ public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String... desiredRole) throws Exception
{
awaitNodeToAttainAttributeValue(localNodePort, remoteNodePort, BDBHARemoteReplicationNode.ROLE, desiredRole);
}
- public void awaitNodeToAttainAttributeValue(int localNodePort, int remoteNodePort, String attributeName, String desiredValue) throws Exception
+ public void awaitNodeToAttainAttributeValue(int localNodePort, int remoteNodePort, String attributeName, String... desiredValue) throws Exception
{
final long startTime = System.currentTimeMillis();
Map<String, Object> data = Collections.emptyMap();
- while(!desiredValue.equals(data.get(attributeName)) && (System.currentTimeMillis() - startTime) < 30000)
+ List<String> desiredValues = Arrays.asList( desiredValue );
+ while(!desiredValues.contains(data.get(attributeName)) && (System.currentTimeMillis() - startTime) < 30000)
{
- LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredValue + " role");
+ LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' attribute " +
+ attributeName + " to have value set to any of " + desiredValues);
data = getNodeAttributes(localNodePort, remoteNodePort);
- if (!desiredValue.equals(data.get(attributeName)))
+ if (!desiredValue.equals(String.valueOf(data.get(attributeName))))
{
Thread.sleep(1000);
}
}
LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' attribute '" + attributeName + "' is " + data.get(attributeName));
- Assert.assertEquals("Unexpected " + attributeName + " at " + localNodePort, desiredValue, data.get(attributeName));
+ Assert.assertTrue("Unexpected " + attributeName + " at " + localNodePort, desiredValues.contains(String.valueOf(data.get(attributeName))));
}
public RestTestHelper createRestTestHelper(int brokerPort)
@@ -493,6 +496,16 @@ public class GroupCreator
return permittedNodes;
}
+ public Map<Integer, String> groupThreadumps()
+ {
+ Map<Integer,String> threadDumps = new HashMap<>();
+ for(GroupMember m: _members.values())
+ {
+ threadDumps.put(m._amqpPort, m._brokerHolder.dumpThreads());
+ }
+ return threadDumps;
+ }
+
private class GroupMember
{
int _amqpPort;
Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java?rev=1726609&r1=1726608&r2=1726609&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java Mon Jan 25 11:24:37 2016
@@ -21,10 +21,14 @@ package org.apache.qpid.server.store.ber
import java.io.File;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -34,8 +38,10 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import com.sleepycat.je.Durability;
@@ -50,8 +56,8 @@ import org.apache.qpid.jms.ConnectionLis
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestUtils;
import org.apache.qpid.util.FileUtils;
@@ -221,10 +227,7 @@ public class MultiNodeTest extends QpidB
_failoverListener.awaitFailoverCompletion(20000);
- Map<String, Object> attrs = _groupCreator.getNodeAttributes(activeBrokerPort);
- String roleInGroup = (String) attrs.get(BDBHARemoteReplicationNode.ROLE);
- assertTrue("Original master should have rejoined the group as either MASTER or REPLICA, but " + roleInGroup,
- "MASTER".equals(roleInGroup) || "REPLICA".equals(roleInGroup));
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA");
}
public void testPersistentMessagesAvailableAfterFailover() throws Exception
@@ -458,6 +461,178 @@ public class MultiNodeTest extends QpidB
}
+ public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception
+ {
+ int connectionNumber = 20;
+ ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES -1);
+ try
+ {
+ ConnectionURL connectionUrl = _groupCreator.getConnectionUrlForAllClusterNodes(100, 0, 100);
+
+ final Connection consumerConnection = getConnection(connectionUrl);
+ consumerConnection.start();
+
+ final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Destination destination = consumerSession.createQueue(getTestQueueName());
+ consumerSession.createConsumer(destination).setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ LOGGER.info("Message received: " + ((TextMessage) message).getText());
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Failure to get message text", e);
+ }
+ }
+ });
+
+ final Connection[] connections = new Connection[connectionNumber];
+ final Session[] sessions = new Session[connectionNumber];
+ for (int i = 0; i < sessions.length; i++)
+ {
+ connections[i] = getConnection(connectionUrl);
+ sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
+ LOGGER.info("Session {} is created", i);
+ }
+
+ List<Integer> ports = new ArrayList<>(_groupCreator.getBrokerPortNumbersForNodes());
+
+ int maxMessageSize = 10;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < maxMessageSize - 2; i++)
+ {
+ sb.append("X");
+ }
+ String messageText = sb.toString();
+ for (int n = 0; n < NUMBER_OF_NODES; n++)
+ {
+ LOGGER.info("Starting iteration {}", n);
+
+ FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
+
+ for (int i = 0; i < sessions.length; i++)
+ {
+ AMQConnection connection = (AMQConnection)connections[i];
+ connection.setConnectionListener(failoverListener);
+
+ MessageProducer producer = sessions[i].createProducer(destination);
+ Message message = sessions[i].createTextMessage(messageText + "-" + i);
+ producer.send(message);
+ }
+
+ LOGGER.info("All publishing sessions have uncommitted transactions");
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ List<Integer> inactivePorts = new ArrayList<>(ports);
+ inactivePorts.remove(new Integer(activeBrokerPort));
+
+ final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
+ for (int port : inactivePorts)
+ {
+ final int inactiveBrokerPort = port;
+ LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+
+ executorService.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ _groupCreator.setNodeAttributes(inactiveBrokerPort,
+ inactiveBrokerPort,
+ Collections.<String, Object>singletonMap(
+ BDBHAVirtualHostNode.DESIRED_STATE,
+ State.STOPPED.name()));
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to stop node on broker with port " + inactiveBrokerPort, e);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
+ for (int i = 0; i < sessions.length; i++)
+ {
+ final Session session = sessions[i];
+ executorService.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (JMSException e)
+ {
+ // majority of commits might fail due to insufficient replicas
+ }
+ }
+ });
+ }
+
+ LOGGER.info("Verify that stopped nodes are in detached role");
+ for (int port : inactivePorts)
+ {
+ _groupCreator.awaitNodeToAttainRole(port, NodeRole.DETACHED.name());
+ }
+
+ LOGGER.info("Start stopped nodes");
+ for (int port : inactivePorts)
+ {
+ LOGGER.info("Starting node for inactive broker on port " + port);
+ try
+ {
+ _groupCreator.setNodeAttributes(port,
+ port,
+ Collections.<String, Object>singletonMap(
+ BDBHAVirtualHostNode.DESIRED_STATE,
+ State.ACTIVE.name()));
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Failed to start node on broker with port " + port, e);
+ }
+ }
+
+ for (int port : ports)
+ {
+ _groupCreator.awaitNodeToAttainRole(port, "REPLICA", "MASTER");
+ }
+
+ if (failoverListener.isFailoverStarted())
+ {
+ LOGGER.info("Waiting for failover completion");
+ failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
+ LOGGER.info("Failover has finished");
+ }
+ else
+ {
+ LOGGER.info("Failover did not occur");
+ }
+ }
+ }
+ finally
+ {
+ executorService.shutdown();
+ }
+ }
+
public void testQuorumOverride() throws Exception
{
final Connection connection = getConnection(_positiveFailoverUrl);
@@ -591,7 +766,18 @@ public class MultiNodeTest extends QpidB
private final class FailoverAwaitingListener implements ConnectionListener
{
- private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1);
+ private final CountDownLatch _failoverCompletionLatch;
+ private volatile boolean _failoverStarted;
+
+ private FailoverAwaitingListener()
+ {
+ this(1);
+ }
+
+ private FailoverAwaitingListener(int connectionNumber)
+ {
+ _failoverCompletionLatch = new CountDownLatch(connectionNumber);
+ }
@Override
public boolean preResubscribe()
@@ -600,8 +786,9 @@ public class MultiNodeTest extends QpidB
}
@Override
- public boolean preFailover(boolean redirect)
+ public synchronized boolean preFailover(boolean redirect)
{
+ _failoverStarted = true;
return true;
}
@@ -609,7 +796,12 @@ public class MultiNodeTest extends QpidB
{
if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
{
- LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+ LOGGER.warn("Failover did not occur, dumping threads:\n\n" + TestUtils.dumpThreads() + "\n");
+ Map<Integer,String> threadDumps = _groupCreator.groupThreadumps();
+ for (Map.Entry<Integer,String> entry : threadDumps.entrySet())
+ {
+ LOGGER.warn("Broker {} thread dump:\n\n {}" , entry.getKey(), entry.getValue());
+ }
}
assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
}
@@ -635,6 +827,11 @@ public class MultiNodeTest extends QpidB
public void bytesReceived(long count)
{
}
+
+ public synchronized boolean isFailoverStarted()
+ {
+ return _failoverStarted;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org