You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/29 10:26:38 UTC
svn commit: r1635076 -
/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
Author: kwall
Date: Wed Oct 29 09:26:37 2014
New Revision: 1635076
URL: http://svn.apache.org/r1635076
Log:
QPID-6192: [Java Broker] Add supporting test case guarding case when failover occurs when busy
Modified:
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1635076&r1=1635075&r2=1635076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Wed Oct 29 09:26:37 2014
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Test suite to test all possible failover corner cases
@@ -713,6 +714,106 @@ public class FailoverBehaviourTest exten
browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
+ {
+ doFailoverWhilstPublishingInFlight(true);
+ }
+
+ public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
+ {
+ doFailoverWhilstPublishingInFlight(false);
+ }
+
+ private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws JMSException, InterruptedException
+ {
+ init(Session.SESSION_TRANSACTED, false);
+
+ final int numberOfMessages = 200;
+
+ final CountDownLatch halfWay = new CountDownLatch(1);
+ final CountDownLatch allDone = new CountDownLatch(1);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ Runnable producerRunnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ Thread.currentThread().setName("ProducingThread");
+
+ try
+ {
+ for(int i=0; i< numberOfMessages; i++)
+ {
+ boolean success = false;
+ while(!success)
+ {
+ try
+ {
+ Message message = _producerSession.createMessage();
+ message.setIntProperty("msgNum", i);
+ _producer.send(message);
+ _producerSession.commit();
+ success = true;
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // fail - failover should not leave a JMS object in an illegal state
+ throw e;
+ }
+ catch (JMSException e)
+ {
+ // OK we will be failing over
+ _logger.debug("Got JMS exception, probably just failing over", e);
+ }
+ }
+
+ if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
+ {
+ halfWay.countDown();
+ }
+ }
+
+ allDone.countDown();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ }
+ };
+
+ Thread producerThread = new Thread(producerRunnable);
+ producerThread.start();
+
+ assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
+
+ if (hardKill)
+ {
+ _logger.debug("Killing the Broker");
+ killBroker(getFailingPort());
+ }
+ else
+ {
+ _logger.debug("Stopping the Broker");
+ stopBroker(getFailingPort());
+ }
+
+ if (exception.get() != null)
+ {
+ _logger.error("Unexpected exception from producer thread", exception.get());
+ }
+ assertNull("Producer thread should not have got an exception", exception.get());
+
+ assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
+
+ producerThread.join(30000);
+
+ // Extra work to prove the session still okay
+ assertNotNull(_producerSession.createTemporaryQueue());
+ }
+
+
private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org