You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/04 19:37:00 UTC
svn commit: r1031136 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Author: gtully
Date: Thu Nov 4 18:37:00 2010
New Revision: 1031136
URL: http://svn.apache.org/viewvc?rev=1031136&view=rev
Log:
starter test case for https://issues.apache.org/activemq/browse/AMQ-2908 - it works though
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1031136&r1=1031135&r2=1031136&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Thu Nov 4 18:37:00 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
@@ -275,6 +276,132 @@ public class ExpiredMessagesWithNoConsum
LOG.info("done: " + getName());
}
+ public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
+ createBroker();
+ final long queuePrefetch = 600;
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ final int ttl = 4000;
+ producer.setTimeToLive(ttl);
+
+ final long sendCount = 1500;
+ final CountDownLatch receivedOneCondition = new CountDownLatch(1);
+ final CountDownLatch waitCondition = new CountDownLatch(1);
+ final AtomicLong received = new AtomicLong();
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message message) {
+ try {
+ LOG.info("Got my message: " + message);
+ receivedOneCondition.countDown();
+ received.incrementAndGet();
+ waitCondition.await(60, TimeUnit.SECONDS);
+ LOG.info("acking message: " + message);
+ message.acknowledge();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.toString());
+ }
+ }
+ });
+
+ connection.start();
+
+
+ final Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ int i = 0;
+ long tStamp = System.currentTimeMillis();
+ while (i++ < sendCount) {
+ producer.send(session.createTextMessage("test"));
+ if (i%100 == 0) {
+ LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
+ tStamp = System.currentTimeMillis() ;
+ }
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ producingThread.start();
+ assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
+
+ assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ producingThread.join(1000);
+ return !producingThread.isAlive();
+ }
+ }));
+
+ final DestinationViewMBean view = createView(destination);
+
+ assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return queuePrefetch == view.getDispatchCount();
+ }
+ }));
+ assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return sendCount == view.getExpiredCount();
+ }
+ }));
+
+ LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ + ", size= " + view.getQueueSize());
+
+ // let the ack happen
+ waitCondition.countDown();
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ // consumer ackLater(delivery ack for expired messages) is based on half the prefetch value
+ // which will leave half of the prefetch pending till consumer close
+ return (queuePrefetch/2) -1 == view.getInFlightCount();
+ }
+ });
+ LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ + ", size= " + view.getQueueSize());
+
+
+ assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
+ assertEquals("size gets back to 0 ", 0, view.getQueueSize());
+ assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
+
+
+ // produce some more
+ producer.setTimeToLive(0);
+ for (int i=0; i<sendCount; i++) {
+ producer.send(session.createTextMessage("test-" + i));
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return received.get() >= sendCount;
+ }
+ });
+
+ consumer.close();
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return 0 == view.getInFlightCount();
+ }
+ });
+ assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
+
+ LOG.info("done: " + getName());
+ }
+
+
+
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;