You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/23 08:54:51 UTC

svn commit: r1626953 - in /qpid/trunk/qpid/java/bdbstore: src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java

Author: kwall
Date: Tue Sep 23 06:54:51 2014
New Revision: 1626953

URL: http://svn.apache.org/r1626953
Log:
QPID-6102: [Java Broker] HA Prevent InsufficientReplica/InsufficientAckExceptions and other exceptions that require JE environment restart from causing Broker shutdown.

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626953&r1=1626952&r2=1626953&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Sep 23 06:54:51 2014
@@ -90,6 +90,7 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.DaemonThreadFactory;
 
 public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
@@ -384,6 +385,7 @@ public class ReplicatedEnvironmentFacade
         if (restart)
         {
             tryToRestartEnvironment(dbe);
+            throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe);
         }
         return dbe;
     }

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java?rev=1626953&r1=1626952&r2=1626953&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java Tue Sep 23 06:54:51 2014
@@ -25,13 +25,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
 
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.EnvironmentConfig;
@@ -262,6 +267,112 @@ public class MultiNodeTest extends QpidB
         _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
     }
 
+    public void testTransferMasterWhilstMessagesInFlight() throws Exception
+    {
+        final Connection connection = getConnection(_positiveFailoverUrl);
+        connection.start();
+        ((AMQConnection) connection).setConnectionListener(_failoverListener);
+
+        final AtomicBoolean masterTransfered = new AtomicBoolean(false);
+        final AtomicBoolean keepRunning = new AtomicBoolean(true);
+        final AtomicReference<Exception> workerException = new AtomicReference<>();
+        final CountDownLatch producedOneBefore = new CountDownLatch(1);
+        final CountDownLatch producedOneAfter = new CountDownLatch(1);
+        final CountDownLatch workerShutdown = new CountDownLatch(1);
+
+
+        Runnable producer = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    int count = 0;
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    Destination destination = session.createQueue(getTestQueueName());
+                    MessageProducer producer = session.createProducer(destination);
+                    session.createConsumer(destination).close();
+
+                    while (keepRunning.get())
+                    {
+                        String messageText = "message" + count;
+                        try
+                        {
+                            Message message = session.createTextMessage(messageText);
+                            producer.send(message);
+                            session.commit();
+                            LOGGER.debug("Sent message " + count);
+                        }
+                        catch (TransactionRolledBackException trbe)
+                        {
+
+                        }
+                        catch(JMSException je)
+                        {
+
+                        }
+
+                        producedOneBefore.countDown();
+
+                        if (masterTransfered.get())
+                        {
+                            producedOneAfter.countDown();
+                        }
+                        count++;
+                    }
+                }
+                catch (Exception e)
+                {
+                    workerException.set(e);
+                }
+                finally
+                {
+                    workerShutdown.countDown();
+                }
+            }
+
+        };
+
+        Thread backgroundWorker = new Thread(producer);
+        backgroundWorker.start();
+
+        boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
+        assertTrue(workerRunning);
+
+        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+        LOGGER.info("Active connection port " + activeBrokerPort);
+
+        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+        LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+        Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+        assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        masterTransfered.set(true);
+
+        _failoverListener.awaitFailoverCompletion(20000);
+        LOGGER.info("Failover has finished");
+
+        attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+        assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+
+        boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
+
+        keepRunning.set(false);
+        boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
+        assertTrue("Worker thread should have shutdown", shutdown);
+
+        backgroundWorker.join(5000);
+        assertNull(workerException.get());
+
+    }
+
     public void testQuorumOverride() throws Exception
     {
         final Connection connection = getConnection(_positiveFailoverUrl);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org