You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/06/01 16:33:06 UTC

svn commit: r1682915 - /qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java

Author: orudyy
Date: Mon Jun  1 14:33:06 2015
New Revision: 1682915

URL: http://svn.apache.org/r1682915
Log:
NO-JIRA: Improve relaibility of test PriorityQueueTest#testReleaseMessageThatBecomesExpiredIsNotRedelivered by replacing hard-coded Thread#sleep intervals with conditional waiting logic

Modified:
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1682915&r1=1682914&r2=1682915&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Jun  1 14:33:06 2015
@@ -37,7 +37,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.server.util.StateChangeListener;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -292,6 +295,22 @@ abstract class AbstractQueueTestBase ext
     public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
     {
         ServerMessage messageA = createMessage(new Long(24));
+        final CountDownLatch sendIndicator = new CountDownLatch(1);
+        _consumerTarget = new MockConsumer()
+        {
+            @Override
+            public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
+            {
+                try
+                {
+                    return super.send(consumer, entry, batch);
+                }
+                finally
+                {
+                    sendIndicator.countDown();
+                }
+            }
+        };
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
                                            EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
@@ -308,26 +327,46 @@ abstract class AbstractQueueTestBase ext
 
         _queue.enqueue(messageA, postEnqueueAction, null);
 
-        int subFlushWaitTime = 150;
-        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+        assertTrue("Message was not sent during expected time interval", sendIndicator.await(5000, TimeUnit.MILLISECONDS));
 
-        assertEquals("Unexpected total number of messages sent to consumer",
-                     1,
-                     _consumerTarget.getMessages().size());
-        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+        assertEquals("Unexpected total number of messages sent to consumer", 1, _consumerTarget.getMessages().size());
+        QueueEntry queueEntry = queueEntries.get(0);
+
+        final CountDownLatch dequeueIndicator = new CountDownLatch(1);
+        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+        {
+            @Override
+            public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState)
+            {
+                if (newState == MessageInstance.State.DEQUEUED)
+                {
+                    dequeueIndicator.countDown();
+                }
+            }
+        });
+        assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
 
         /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
-        Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
-        queueEntries.get(0).release();
+        while(!queueEntry.expired() && System.currentTimeMillis() <= expiration )
+        {
+            Thread.sleep(10);
+        }
 
-        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+        assertTrue("Expecting the queue entry to be now expired", queueEntry.expired());
+        queueEntry.release();
 
-        assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
-        assertEquals("Total number of messages sent should not have changed",
-                     1,
-                     _consumerTarget.getMessages().size());
-        assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed",
+        assertTrue("Message was not de-queued due to expiration", dequeueIndicator.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals("Total number of messages sent should not have changed", 1, _consumerTarget.getMessages().size());
+        assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
+
+        // QueueContext#_releasedEntry is updated after notification, thus, we need to make sure that it is updated
+        long waitLoopLimit = 10;
+        while(_consumer.getQueueContext().getReleasedEntry() != null && waitLoopLimit-- > 0 )
+        {
+            Thread.sleep(10);
+        }
+        assertNull("releasedEntry should be cleared after requeue processed:" +  _consumer.getQueueContext().getReleasedEntry(),
                    _consumer.getQueueContext().getReleasedEntry());
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org