You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/29 10:26:38 UTC

svn commit: r1635076 - /qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java

Author: kwall
Date: Wed Oct 29 09:26:37 2014
New Revision: 1635076

URL: http://svn.apache.org/r1635076
Log:
QPID-6192: [Java Broker] Add supporting test case guarding case when failover occurs when busy

Modified:
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java

Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1635076&r1=1635075&r2=1635076&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Wed Oct 29 09:26:37 2014
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Test suite to test all possible failover corner cases
@@ -713,6 +714,106 @@ public class FailoverBehaviourTest exten
         browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
     }
 
+    public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
+    {
+        doFailoverWhilstPublishingInFlight(true);
+    }
+
+    public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
+    {
+        doFailoverWhilstPublishingInFlight(false);
+    }
+
+    private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws JMSException, InterruptedException
+    {
+        init(Session.SESSION_TRANSACTED, false);
+
+        final int numberOfMessages = 200;
+
+        final CountDownLatch halfWay = new CountDownLatch(1);
+        final CountDownLatch allDone = new CountDownLatch(1);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+
+        Runnable producerRunnable = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                Thread.currentThread().setName("ProducingThread");
+
+                try
+                {
+                    for(int i=0; i< numberOfMessages; i++)
+                    {
+                        boolean success = false;
+                        while(!success)
+                        {
+                            try
+                            {
+                                Message message = _producerSession.createMessage();
+                                message.setIntProperty("msgNum", i);
+                                _producer.send(message);
+                                _producerSession.commit();
+                                success = true;
+                            }
+                            catch (javax.jms.IllegalStateException e)
+                            {
+                                // fail - failover should not leave a JMS object in an illegal state
+                                throw e;
+                            }
+                            catch (JMSException e)
+                            {
+                                // OK we will be failing over
+                                _logger.debug("Got JMS exception, probably just failing over", e);
+                            }
+                        }
+
+                        if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
+                        {
+                            halfWay.countDown();
+                        }
+                    }
+
+                    allDone.countDown();
+                }
+                catch (Exception e)
+                {
+                    exception.set(e);
+                }
+            }
+        };
+
+        Thread producerThread = new Thread(producerRunnable);
+        producerThread.start();
+
+        assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
+
+        if (hardKill)
+        {
+            _logger.debug("Killing the Broker");
+            killBroker(getFailingPort());
+        }
+        else
+        {
+            _logger.debug("Stopping the Broker");
+            stopBroker(getFailingPort());
+        }
+
+        if (exception.get() != null)
+        {
+            _logger.error("Unexpected exception from producer thread", exception.get());
+        }
+        assertNull("Producer thread should not have got an exception", exception.get());
+
+        assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
+
+        producerThread.join(30000);
+
+        // Extra work to prove the session still okay
+        assertNotNull(_producerSession.createTemporaryQueue());
+    }
+
+
     private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
     {
         setDelayedFailoverPolicy(5);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org