You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/01/25 12:24:37 UTC

svn commit: r1726609 - in /qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication: GroupCreator.java MultiNodeTest.java

Author: orudyy
Date: Mon Jan 25 11:24:37 2016
New Revision: 1726609

URL: http://svn.apache.org/viewvc?rev=1726609&view=rev
Log:
QPID-6999: Add a system test to verify broker behaviour on failures in transaction commits after majority being lost

Modified:
    qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
    qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java

Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java?rev=1726609&r1=1726608&r2=1726609&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java Mon Jan 25 11:24:37 2016
@@ -25,6 +25,7 @@ import java.io.StringWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -437,32 +438,34 @@ public class GroupCreator
         }
     }
 
-    public void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
+    public void awaitNodeToAttainRole(int brokerPort, String... desiredRole) throws Exception
     {
         awaitNodeToAttainRole(brokerPort, brokerPort, desiredRole);
     }
 
-    public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String desiredRole) throws Exception
+    public void awaitNodeToAttainRole(int localNodePort, int remoteNodePort, String... desiredRole) throws Exception
     {
         awaitNodeToAttainAttributeValue(localNodePort, remoteNodePort, BDBHARemoteReplicationNode.ROLE, desiredRole);
     }
 
-    public void awaitNodeToAttainAttributeValue(int localNodePort, int remoteNodePort, String attributeName, String desiredValue) throws Exception
+    public void awaitNodeToAttainAttributeValue(int localNodePort, int remoteNodePort, String attributeName, String... desiredValue) throws Exception
     {
         final long startTime = System.currentTimeMillis();
         Map<String, Object> data = Collections.emptyMap();
 
-        while(!desiredValue.equals(data.get(attributeName)) && (System.currentTimeMillis() - startTime) < 30000)
+        List<String> desiredValues = Arrays.asList( desiredValue );
+        while(!desiredValues.contains(data.get(attributeName)) && (System.currentTimeMillis() - startTime) < 30000)
         {
-            LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' to transit into " + desiredValue + " role");
+            LOGGER.debug("Awaiting node '" + getNodeNameForBrokerPort(remoteNodePort) + "' attribute " +
+                         attributeName  + " to have value set to any of " + desiredValues);
             data = getNodeAttributes(localNodePort, remoteNodePort);
-            if (!desiredValue.equals(data.get(attributeName)))
+            if (!desiredValue.equals(String.valueOf(data.get(attributeName))))
             {
                 Thread.sleep(1000);
             }
         }
         LOGGER.debug("Node '" + getNodeNameForBrokerPort(remoteNodePort) + "' attribute  '" + attributeName + "' is " + data.get(attributeName));
-        Assert.assertEquals("Unexpected " + attributeName + " at " + localNodePort, desiredValue, data.get(attributeName));
+        Assert.assertTrue("Unexpected " + attributeName + " at " + localNodePort, desiredValues.contains(String.valueOf(data.get(attributeName))));
     }
 
     public RestTestHelper createRestTestHelper(int brokerPort)
@@ -493,6 +496,16 @@ public class GroupCreator
         return permittedNodes;
     }
 
+    public Map<Integer, String> groupThreadumps()
+    {
+        Map<Integer,String> threadDumps = new HashMap<>();
+        for(GroupMember m: _members.values())
+        {
+            threadDumps.put(m._amqpPort, m._brokerHolder.dumpThreads());
+        }
+        return threadDumps;
+    }
+
     private class GroupMember
     {
         int _amqpPort;

Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java?rev=1726609&r1=1726608&r2=1726609&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java Mon Jan 25 11:24:37 2016
@@ -21,10 +21,14 @@ package org.apache.qpid.server.store.ber
 
 import java.io.File;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -34,8 +38,10 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
 
 import com.sleepycat.je.Durability;
@@ -50,8 +56,8 @@ import org.apache.qpid.jms.ConnectionLis
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestUtils;
 import org.apache.qpid.util.FileUtils;
@@ -221,10 +227,7 @@ public class MultiNodeTest extends QpidB
 
         _failoverListener.awaitFailoverCompletion(20000);
 
-        Map<String, Object> attrs = _groupCreator.getNodeAttributes(activeBrokerPort);
-        String roleInGroup = (String) attrs.get(BDBHARemoteReplicationNode.ROLE);
-        assertTrue("Original master should have rejoined the group as either MASTER or REPLICA, but " + roleInGroup,
-                   "MASTER".equals(roleInGroup) || "REPLICA".equals(roleInGroup));
+        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA");
     }
 
     public void testPersistentMessagesAvailableAfterFailover() throws Exception
@@ -458,6 +461,178 @@ public class MultiNodeTest extends QpidB
 
     }
 
+    public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception
+    {
+        int connectionNumber = 20;
+        ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES -1);
+        try
+        {
+            ConnectionURL connectionUrl = _groupCreator.getConnectionUrlForAllClusterNodes(100, 0, 100);
+
+            final Connection consumerConnection = getConnection(connectionUrl);
+            consumerConnection.start();
+
+            final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Destination destination = consumerSession.createQueue(getTestQueueName());
+            consumerSession.createConsumer(destination).setMessageListener(new MessageListener()
+            {
+                @Override
+                public void onMessage(final Message message)
+                {
+                    try
+                    {
+                        LOGGER.info("Message received: " + ((TextMessage) message).getText());
+                    }
+                    catch (JMSException e)
+                    {
+                        LOGGER.error("Failure to get message text", e);
+                    }
+                }
+            });
+
+            final Connection[] connections = new Connection[connectionNumber];
+            final Session[] sessions = new Session[connectionNumber];
+            for (int i = 0; i < sessions.length; i++)
+            {
+                connections[i] = getConnection(connectionUrl);
+                sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
+                LOGGER.info("Session {} is created", i);
+            }
+
+            List<Integer> ports = new ArrayList<>(_groupCreator.getBrokerPortNumbersForNodes());
+
+            int maxMessageSize = 10;
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < maxMessageSize - 2; i++)
+            {
+                sb.append("X");
+            }
+            String messageText = sb.toString();
+            for (int n = 0; n < NUMBER_OF_NODES; n++)
+            {
+                LOGGER.info("Starting iteration {}", n);
+
+                FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
+
+                for (int i = 0; i < sessions.length; i++)
+                {
+                    AMQConnection connection = (AMQConnection)connections[i];
+                    connection.setConnectionListener(failoverListener);
+
+                    MessageProducer producer = sessions[i].createProducer(destination);
+                    Message message = sessions[i].createTextMessage(messageText + "-" + i);
+                    producer.send(message);
+                }
+
+                LOGGER.info("All publishing sessions have uncommitted transactions");
+
+                final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]);
+                LOGGER.info("Active connection port " + activeBrokerPort);
+
+                List<Integer> inactivePorts = new ArrayList<>(ports);
+                inactivePorts.remove(new Integer(activeBrokerPort));
+
+                final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
+                for (int port : inactivePorts)
+                {
+                    final int inactiveBrokerPort = port;
+                    LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+
+                    executorService.submit(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                _groupCreator.setNodeAttributes(inactiveBrokerPort,
+                                                                inactiveBrokerPort,
+                                                                Collections.<String, Object>singletonMap(
+                                                                        BDBHAVirtualHostNode.DESIRED_STATE,
+                                                                        State.STOPPED.name()));
+                            }
+                            catch (Exception e)
+                            {
+                                LOGGER.error("Failed to stop node on broker with port " + inactiveBrokerPort, e);
+                            }
+                            finally
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    });
+                }
+
+                latch.await(500, TimeUnit.MILLISECONDS);
+
+                LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
+                for (int i = 0; i < sessions.length; i++)
+                {
+                    final Session session = sessions[i];
+                    executorService.submit(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                session.commit();
+                            }
+                            catch (JMSException e)
+                            {
+                                // majority of commits might fail due to insufficient replicas
+                            }
+                        }
+                    });
+                }
+
+                LOGGER.info("Verify that stopped nodes are in detached role");
+                for (int port : inactivePorts)
+                {
+                    _groupCreator.awaitNodeToAttainRole(port, NodeRole.DETACHED.name());
+                }
+
+                LOGGER.info("Start stopped nodes");
+                for (int port : inactivePorts)
+                {
+                    LOGGER.info("Starting node for inactive broker on port " + port);
+                    try
+                    {
+                        _groupCreator.setNodeAttributes(port,
+                                                        port,
+                                                        Collections.<String, Object>singletonMap(
+                                                                BDBHAVirtualHostNode.DESIRED_STATE,
+                                                                State.ACTIVE.name()));
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Failed to start node on broker with port " + port, e);
+                    }
+                }
+
+                for (int port : ports)
+                {
+                    _groupCreator.awaitNodeToAttainRole(port, "REPLICA", "MASTER");
+                }
+
+                if (failoverListener.isFailoverStarted())
+                {
+                    LOGGER.info("Waiting for failover completion");
+                    failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
+                    LOGGER.info("Failover has finished");
+                }
+                else
+                {
+                    LOGGER.info("Failover did not occur");
+                }
+            }
+        }
+        finally
+        {
+            executorService.shutdown();
+        }
+    }
+
     public void testQuorumOverride() throws Exception
     {
         final Connection connection = getConnection(_positiveFailoverUrl);
@@ -591,7 +766,18 @@ public class MultiNodeTest extends QpidB
 
     private final class FailoverAwaitingListener implements ConnectionListener
     {
-        private final CountDownLatch _failoverCompletionLatch = new CountDownLatch(1);
+        private final CountDownLatch _failoverCompletionLatch;
+        private volatile boolean _failoverStarted;
+
+        private FailoverAwaitingListener()
+        {
+            this(1);
+        }
+
+        private FailoverAwaitingListener(int connectionNumber)
+        {
+            _failoverCompletionLatch = new CountDownLatch(connectionNumber);
+        }
 
         @Override
         public boolean preResubscribe()
@@ -600,8 +786,9 @@ public class MultiNodeTest extends QpidB
         }
 
         @Override
-        public boolean preFailover(boolean redirect)
+        public synchronized boolean preFailover(boolean redirect)
         {
+            _failoverStarted = true;
             return true;
         }
 
@@ -609,7 +796,12 @@ public class MultiNodeTest extends QpidB
         {
             if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
             {
-                LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+                LOGGER.warn("Failover did not occur, dumping threads:\n\n" + TestUtils.dumpThreads() + "\n");
+                Map<Integer,String> threadDumps = _groupCreator.groupThreadumps();
+                for (Map.Entry<Integer,String> entry : threadDumps.entrySet())
+                {
+                    LOGGER.warn("Broker {} thread dump:\n\n {}" , entry.getKey(), entry.getValue());
+                }
             }
             assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
         }
@@ -635,6 +827,11 @@ public class MultiNodeTest extends QpidB
         public void bytesReceived(long count)
         {
         }
+
+        public synchronized boolean isFailoverStarted()
+        {
+            return _failoverStarted;
+        }
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org