You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/04 19:37:00 UTC

svn commit: r1031136 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Author: gtully
Date: Thu Nov  4 18:37:00 2010
New Revision: 1031136

URL: http://svn.apache.org/viewvc?rev=1031136&view=rev
Log:
starter test case for https://issues.apache.org/activemq/browse/AMQ-2908 - it works though

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1031136&r1=1031135&r2=1031136&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Thu Nov  4 18:37:00 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.usecases;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -275,6 +276,132 @@ public class ExpiredMessagesWithNoConsum
         LOG.info("done: " + getName());
     }
 
+    public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
+        createBroker();
+        final long queuePrefetch = 600;
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        final int ttl = 4000;
+        producer.setTimeToLive(ttl);
+
+        final long sendCount = 1500;
+        final CountDownLatch receivedOneCondition = new CountDownLatch(1);
+        final CountDownLatch waitCondition = new CountDownLatch(1);
+        final AtomicLong received = new AtomicLong();
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("Got my message: " + message);
+                    receivedOneCondition.countDown();
+                    received.incrementAndGet();
+                    waitCondition.await(60, TimeUnit.SECONDS);
+                    LOG.info("acking message: " + message);
+                    message.acknowledge();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }
+            }
+        });
+
+        connection.start();
+
+
+        final Thread producingThread = new Thread("Producing Thread") {
+            public void run() {
+                try {
+                    int i = 0;
+                    long tStamp = System.currentTimeMillis();
+                    while (i++ < sendCount) {
+                        producer.send(session.createTextMessage("test"));
+                        if (i%100 == 0) {
+                            LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100)  + "m/ms");
+                            tStamp = System.currentTimeMillis() ;
+                        }
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        producingThread.start();
+        assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
+
+        assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                producingThread.join(1000);
+                return !producingThread.isAlive();
+            }
+        }));
+
+        final DestinationViewMBean view = createView(destination);
+
+        assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return queuePrefetch == view.getDispatchCount();
+            }
+        }));
+        assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return sendCount == view.getExpiredCount();
+            }
+        }));
+
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
+
+        // let the ack happen
+        waitCondition.countDown();
+
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                // consumer ackLater(delivery ack for expired messages) is based on half the prefetch value
+                // which will leave half of the prefetch pending till consumer close
+                return (queuePrefetch/2) -1 == view.getInFlightCount();
+            }
+        });
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
+
+
+        assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
+        assertEquals("size gets back to 0 ", 0, view.getQueueSize());
+        assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
+
+
+        // produce some more
+        producer.setTimeToLive(0);
+        for (int i=0; i<sendCount; i++) {
+            producer.send(session.createTextMessage("test-" + i));
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+             public boolean isSatisified() throws Exception {
+                 return received.get() >= sendCount;
+             }
+         });
+
+        consumer.close();
+
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return 0 == view.getInFlightCount();
+            }
+        });
+        assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
+
+        LOG.info("done: " + getName());
+    }
+
+
+
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
         String domain = "org.apache.activemq";
         ObjectName name;