You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/23 08:54:51 UTC
svn commit: r1626953 - in /qpid/trunk/qpid/java/bdbstore:
src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
Author: kwall
Date: Tue Sep 23 06:54:51 2014
New Revision: 1626953
URL: http://svn.apache.org/r1626953
Log:
QPID-6102: [Java Broker] HA Prevent InsufficientReplica/InsufficientAckExceptions and other exceptions that require JE environment restart from causing Broker shutdown.
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626953&r1=1626952&r2=1626953&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Sep 23 06:54:51 2014
@@ -90,6 +90,7 @@ import org.apache.qpid.server.store.berk
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.DaemonThreadFactory;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
@@ -384,6 +385,7 @@ public class ReplicatedEnvironmentFacade
if (restart)
{
tryToRestartEnvironment(dbe);
+ throw new ConnectionScopedRuntimeException(noMajority ? "Required number of nodes not reachable" : "Underlying JE environment is being restarted", dbe);
}
return dbe;
}
Modified: qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java?rev=1626953&r1=1626952&r2=1626953&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java Tue Sep 23 06:54:51 2014
@@ -25,13 +25,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentConfig;
@@ -262,6 +267,112 @@ public class MultiNodeTest extends QpidB
_groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
}
+ public void testTransferMasterWhilstMessagesInFlight() throws Exception
+ {
+ final Connection connection = getConnection(_positiveFailoverUrl);
+ connection.start();
+ ((AMQConnection) connection).setConnectionListener(_failoverListener);
+
+ final AtomicBoolean masterTransfered = new AtomicBoolean(false);
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final AtomicReference<Exception> workerException = new AtomicReference<>();
+ final CountDownLatch producedOneBefore = new CountDownLatch(1);
+ final CountDownLatch producedOneAfter = new CountDownLatch(1);
+ final CountDownLatch workerShutdown = new CountDownLatch(1);
+
+
+ Runnable producer = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ int count = 0;
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageProducer producer = session.createProducer(destination);
+ session.createConsumer(destination).close();
+
+ while (keepRunning.get())
+ {
+ String messageText = "message" + count;
+ try
+ {
+ Message message = session.createTextMessage(messageText);
+ producer.send(message);
+ session.commit();
+ LOGGER.debug("Sent message " + count);
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+
+ }
+ catch(JMSException je)
+ {
+
+ }
+
+ producedOneBefore.countDown();
+
+ if (masterTransfered.get())
+ {
+ producedOneAfter.countDown();
+ }
+ count++;
+ }
+ }
+ catch (Exception e)
+ {
+ workerException.set(e);
+ }
+ finally
+ {
+ workerShutdown.countDown();
+ }
+ }
+
+ };
+
+ Thread backgroundWorker = new Thread(producer);
+ backgroundWorker.start();
+
+ boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue(workerRunning);
+
+ final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+ LOGGER.info("Active connection port " + activeBrokerPort);
+
+ final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
+ LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+ Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+ masterTransfered.set(true);
+
+ _failoverListener.awaitFailoverCompletion(20000);
+ LOGGER.info("Failover has finished");
+
+ attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+ assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+
+ _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+
+ boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
+
+ keepRunning.set(false);
+ boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Worker thread should have shutdown", shutdown);
+
+ backgroundWorker.join(5000);
+ assertNull(workerException.get());
+
+ }
+
public void testQuorumOverride() throws Exception
{
final Connection connection = getConnection(_positiveFailoverUrl);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org