You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/06/01 16:33:06 UTC
svn commit: r1682915 -
/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Author: orudyy
Date: Mon Jun 1 14:33:06 2015
New Revision: 1682915
URL: http://svn.apache.org/r1682915
Log:
NO-JIRA: Improve relaibility of test PriorityQueueTest#testReleaseMessageThatBecomesExpiredIsNotRedelivered by replacing hard-coded Thread#sleep intervals with conditional waiting logic
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1682915&r1=1682914&r2=1682915&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Jun 1 14:33:06 2015
@@ -37,7 +37,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.qpid.server.util.StateChangeListener;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -292,6 +295,22 @@ abstract class AbstractQueueTestBase ext
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
+ final CountDownLatch sendIndicator = new CountDownLatch(1);
+ _consumerTarget = new MockConsumer()
+ {
+ @Override
+ public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ try
+ {
+ return super.send(consumer, entry, batch);
+ }
+ finally
+ {
+ sendIndicator.countDown();
+ }
+ }
+ };
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
@@ -308,26 +327,46 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction, null);
- int subFlushWaitTime = 150;
- Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+ assertTrue("Message was not sent during expected time interval", sendIndicator.await(5000, TimeUnit.MILLISECONDS));
- assertEquals("Unexpected total number of messages sent to consumer",
- 1,
- _consumerTarget.getMessages().size());
- assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
+ assertEquals("Unexpected total number of messages sent to consumer", 1, _consumerTarget.getMessages().size());
+ QueueEntry queueEntry = queueEntries.get(0);
+
+ final CountDownLatch dequeueIndicator = new CountDownLatch(1);
+ queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+ {
+ @Override
+ public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState)
+ {
+ if (newState == MessageInstance.State.DEQUEUED)
+ {
+ dequeueIndicator.countDown();
+ }
+ }
+ });
+ assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
- Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
- queueEntries.get(0).release();
+ while(!queueEntry.expired() && System.currentTimeMillis() <= expiration )
+ {
+ Thread.sleep(10);
+ }
- Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+ assertTrue("Expecting the queue entry to be now expired", queueEntry.expired());
+ queueEntry.release();
- assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed",
- 1,
- _consumerTarget.getMessages().size());
- assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed",
+ assertTrue("Message was not de-queued due to expiration", dequeueIndicator.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals("Total number of messages sent should not have changed", 1, _consumerTarget.getMessages().size());
+ assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
+
+ // QueueContext#_releasedEntry is updated after notification, thus, we need to make sure that it is updated
+ long waitLoopLimit = 10;
+ while(_consumer.getQueueContext().getReleasedEntry() != null && waitLoopLimit-- > 0 )
+ {
+ Thread.sleep(10);
+ }
+ assertNull("releasedEntry should be cleared after requeue processed:" + _consumer.getQueueContext().getReleasedEntry(),
_consumer.getQueueContext().getReleasedEntry());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org