You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/14 14:59:19 UTC

svn commit: r793892 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/usecases/ test/java/org/apache/activemq/util/

Author: gtully
Date: Tue Jul 14 12:59:19 2009
New Revision: 793892

URL: http://svn.apache.org/viewvc?rev=793892&view=rev
Log:
resolve some issue with duplicate expiry due to concurrent execution with expiry task and dispatch/acks etc. some more tests for stats included - https://issues.apache.org/activemq/browse/AMQ-1112

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Jul 14 12:59:19 2009
@@ -222,9 +222,6 @@
                         // Don't remove the nodes until we are committed.  
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
-                            if (!this.getConsumerInfo().isBrowser()) {
-                                node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
-                            }
                             node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                             removeList.add(node);
                         } else {
@@ -238,7 +235,6 @@
                                             synchronized(dispatchLock) {
                                                 dequeueCounter++;
                                                 dispatched.remove(node);
-                                                node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                                                 prefetchExtension--;
                                             }
@@ -287,7 +283,6 @@
                     MessageId messageId = node.getMessageId();
                     if (ack.getLastMessageId().equals(messageId)) {
                         // this should never be within a transaction
-                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
                         node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                         destination = node.getRegionDestination();
                         acknowledge(context, ack, node);
@@ -303,16 +298,12 @@
                 int index = 0;
                 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
                     final MessageReference node = iter.next();
-                    if (hasNotAlreadyExpired(node)) {
-                        if (node.isExpired()) {
+                    if (node.isExpired()) {
+                        if (broker.isExpired(node)) {
                             node.getRegionDestination().messageExpired(context, this, node);
-                            dispatched.remove(node);
-                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                         }
-                    } else {
-                        // already expired
                         dispatched.remove(node);
-                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();    
+                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
                         prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -374,9 +365,6 @@
                     if (inAckRange) {
                         sendToDLQ(context, node);
                         node.getRegionDestination().getDestinationStatistics()
-                                .getDequeues().increment();
-
-                        node.getRegionDestination().getDestinationStatistics()
                                 .getInflight().increment();
 
                         removeList.add(node);
@@ -418,16 +406,6 @@
         }
     }
 
-    private boolean hasNotAlreadyExpired(MessageReference node) {
-        boolean hasNotExpired = true;
-        try {
-            hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION) == null;
-        } catch (IOException e) {
-            LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION + " for " + node, e);
-        }
-        return hasNotExpired;
-    }
-
     /**
      * Checks an ack versus the contents of the dispatched list.
      * 
@@ -610,7 +588,9 @@
                                     if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
                                         //increment number to dispatch
                                         numberToDispatch++;
-                                        node.getRegionDestination().messageExpired(context, this, node);
+                                        if (broker.isExpired(node)) {
+                                            node.getRegionDestination().messageExpired(context, this, node);
+                                        }
                                         continue;
                                     }
                                     dispatch(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul 14 12:59:19 2009
@@ -760,7 +760,9 @@
                     addAll(pagedInPendingDispatch, l, max, toExpire);
                     for (MessageReference ref : toExpire) {
                         pagedInPendingDispatch.remove(ref);
-                        messageExpired(connectionContext, ref);
+                        if (broker.isExpired(ref)) {
+                            messageExpired(connectionContext, ref);
+                        }
                     }
                 }
                 toExpire.clear();
@@ -768,7 +770,13 @@
                     addAll(pagedInMessages.values(), l, max, toExpire);   
                 }
                 for (MessageReference ref : toExpire) {
-                    messageExpired(connectionContext, ref);
+                    if (broker.isExpired(ref)) {
+                        messageExpired(connectionContext, ref);
+                    } else {
+                        synchronized (pagedInMessages) {
+                            pagedInMessages.remove(ref.getMessageId());
+                        }
+                    }
                 }
                 
                 if (l.size() < getMaxBrowsePageSize()) {
@@ -805,7 +813,7 @@
         for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
                 && l.size() < getMaxBrowsePageSize();) {
             QueueMessageReference ref = i.next();
-            if (broker.isExpired(ref)) {
+            if (ref.isExpired()) {
                 toExpire.add(ref);
             } else if (l.contains(ref.getMessage()) == false) {
                 l.add(ref.getMessage());
@@ -1224,6 +1232,7 @@
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
             acknowledge(context, sub, ack, reference);
+            getDestinationStatistics().getDequeues().increment();
             dropMessage(reference);
         } else {
             try {
@@ -1232,6 +1241,7 @@
                 context.getTransaction().addSynchronization(new Synchronization() {
                 
                     public void afterCommit() throws Exception {
+                        getDestinationStatistics().getDequeues().increment();
                         dropMessage(reference);
                         wakeup();
                     }
@@ -1264,11 +1274,10 @@
     }
     
     public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
-        if (LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {      
             LOG.debug("message expired: " + reference);
         }
         broker.messageExpired(context, reference);
-        destinationStatistics.getDequeues().increment();
         destinationStatistics.getExpired().increment();
         try {
             removeMessage(context,subs,(QueueMessageReference)reference);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Tue Jul 14 12:59:19 2009
@@ -45,6 +45,12 @@
      * @throws IOException
      */
     protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+        if (n.isExpired()) {
+            if (!broker.isExpired(n)) {
+                LOG.info("ignoring ack " + ack + ", for already expired message: " + n);
+                return;
+            }
+        }
         final Destination q = n.getRegionDestination();
         final QueueMessageReference node = (QueueMessageReference)n;
         final Queue queue = (Queue)q;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Jul 14 12:59:19 2009
@@ -664,7 +664,30 @@
     }
 
     public boolean isExpired(MessageReference messageReference) {
-        return messageReference.isExpired();
+        boolean expired = false;
+        if (messageReference.isExpired()) {
+            try {
+                // prevent duplicate expiry processing
+                Message message = messageReference.getMessage();
+                synchronized (message) {
+                    expired = stampAsExpired(message);
+                }
+            } catch (IOException e) {
+                LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
+            }
+        }
+        return expired;
+    }
+   
+    private boolean stampAsExpired(Message message) throws IOException {
+        boolean stamped=false;
+        if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
+            long expiration=message.getExpiration();
+            message.setExpiration(0);
+            message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
+            stamped = true;
+        }
+        return stamped;
     }
 
     public void messageExpired(ConnectionContext context, MessageReference node) {
@@ -679,7 +702,8 @@
 		try{
 			if(node!=null){
 				Message message=node.getMessage();
-				if(message!=null&&node.getRegionDestination()!=null){
+				stampAsExpired(message);
+				if(message!=null && node.getRegionDestination()!=null){
 					DeadLetterStrategy deadLetterStrategy=node
 					        .getRegionDestination().getDeadLetterStrategy();
 					if(deadLetterStrategy!=null){
@@ -688,10 +712,6 @@
 						        // message may be inflight to other subscriptions so do not modify
 						        message = message.copy();
 						    }
-							long expiration=message.getExpiration();
-							message.setExpiration(0);
-							message.setProperty(ORIGINAL_EXPIRATION,new Long(
-							        expiration));
 							if(!message.isPersistent()){
 							    message.setPersistent(true);
 							    message.setProperty("originalDeliveryMode",

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Tue Jul 14 12:59:19 2009
@@ -611,14 +611,6 @@
         }
     }
 
-    public boolean isExpired() {
-        long expireTime = this.getExpiration();
-        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
-            return true;
-        }
-        return false;
-    }
-
     public Callback getAcknowledgeCallback() {
         return acknowledgeCallback;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Tue Jul 14 12:59:19 2009
@@ -88,6 +88,7 @@
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination regionDestination;
     private transient MemoryUsage memoryUsage;
+    private transient boolean expired;
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -338,6 +339,9 @@
 
     public void setExpiration(long expiration) {
         this.expiration = expiration;
+        if (this.expiration > 0) {
+            expired = false;
+        }
     }
 
     /**
@@ -435,11 +439,13 @@
     }
 
     public boolean isExpired() {
-        long expireTime = getExpiration();
-        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
-            return true;
+        if (!expired) {
+            long expireTime = getExpiration();
+            if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+                expired = true;
+            }
         }
-        return false;
+        return expired;
     }
 
     public boolean isAdvisory() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Tue Jul 14 12:59:19 2009
@@ -41,6 +41,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -107,7 +108,7 @@
             public void run() {
                 try {
                 	int i = 0;
-                	while (i++ < 30000) {
+                	while (i++ < 10000) {
                 		producer.send(session.createTextMessage("test"));
                 	}
                 	producer.close();
@@ -123,21 +124,41 @@
         producingThread.join();
         session.close();
         
-        Thread.sleep(2000);
-                
-        DestinationViewMBean view = createView(destination);
+        final DestinationViewMBean view = createView(destination);
+        
+        // wait for all to inflight to expire
+        assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return view.getInFlightCount() == 0;
+            }           
+        }));
+        assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
+        
         LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
                 + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
         
-        assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount());
+        // wait for all sent to get delivered and expire
+        assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                long oldEnqueues = view.getEnqueueCount();
+                Thread.sleep(200);
+                LOG.info("Stats: received: "  + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+                        + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
+                return oldEnqueues == view.getEnqueueCount();
+            }           
+        }, 60*1000));
         
-        long expiry = System.currentTimeMillis() + 30000;
-        while (view.getInFlightCount() > 0 && System.currentTimeMillis() < expiry) {
-            Thread.sleep(500);
-        }
-        LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+
+        LOG.info("Stats: received: "  + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
                 + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
-        assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
+        
+        assertTrue("got at least what did not expire", received.get() >= view.getDequeueCount() - view.getExpiredCount());
+        
+        assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return view.getQueueSize() == 0;
+            }
+        }));
 	}
 
 	
@@ -229,6 +250,7 @@
             defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
         }
         defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
+        defaultPolicy.setMaxExpirePageSize(1200);
         PolicyMap policyMap = new PolicyMap();
         policyMap.setDefaultEntry(defaultPolicy);
         broker.setDestinationPolicy(policyMap);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=793892&r1=793891&r2=793892&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Tue Jul 14 12:59:19 2009
@@ -39,6 +39,7 @@
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,7 +48,6 @@
 
     private static final Log LOG = LogFactory.getLog(ExpiredMessagesWithNoConsumerTest.class);
 
-    private static final int expiryPeriod = 1000;
     
 	BrokerService broker;
 	Connection connection;
@@ -81,8 +81,8 @@
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
-        defaultEntry.setExpireMessagesPeriod(expiryPeriod);
-        defaultEntry.setMaxExpirePageSize(200);
+        defaultEntry.setExpireMessagesPeriod(100);
+        defaultEntry.setMaxExpirePageSize(800);
 
         if (memoryLimit) {
             // so memory is not consumed by DLQ turn if off
@@ -106,11 +106,11 @@
 		connection = factory.createConnection();
 		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 		producer = session.createProducer(destination);
-		producer.setTimeToLive(100);
+		producer.setTimeToLive(1000);
 		connection.start();
 		final long sendCount = 2000;		
 		
-		Thread producingThread = new Thread("Producing Thread") {
+		final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
                 	int i = 0;
@@ -130,21 +130,27 @@
 		
 		producingThread.start();
 		
-		final long expiry = System.currentTimeMillis() + 20*1000;
-		while (producingThread.isAlive() && expiry > System.currentTimeMillis()) {
-		    producingThread.join(1000);
-		}
-        
-		assertTrue("producer completed within time ", !producingThread.isAlive());
+		assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                producingThread.join(1000);
+                return !producingThread.isAlive();
+            }
+		}));
 		
-		Thread.sleep(3*expiryPeriod);
-        DestinationViewMBean view = createView(destination);
-        assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
+        final DestinationViewMBean view = createView(destination);
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return sendCount == view.getExpiredCount();
+            }
+        });
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
+        assertEquals("All sent have expired", sendCount, view.getExpiredCount());
 	}
-
-	
     
-    public void testExpiredMessagesWitVerySlowConsumer() throws Exception {
+	// first ack delivered after expiry
+    public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
         createBroker();  
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
         connection = factory.createConnection();
@@ -153,7 +159,7 @@
         final int ttl = 4000;
         producer.setTimeToLive(ttl);
         
-        final long sendCount = 1001; 
+        final long sendCount = 1500; 
         final CountDownLatch receivedOneCondition = new CountDownLatch(1);
         final CountDownLatch waitCondition = new CountDownLatch(1);
         
@@ -165,6 +171,7 @@
                     LOG.info("Got my message: " + message);
                     receivedOneCondition.countDown();
                     waitCondition.await(60, TimeUnit.SECONDS);
+                    LOG.info("acking message: " + message);
                     message.acknowledge();
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -176,7 +183,7 @@
         connection.start();
       
         
-        Thread producingThread = new Thread("Producing Thread") {
+        final Thread producingThread = new Thread("Producing Thread") {
             public void run() {
                 try {
                     int i = 0;
@@ -195,30 +202,46 @@
         };
         
         producingThread.start();
+        assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
         
-        final long expiry = System.currentTimeMillis() + 20*1000;
-        while (producingThread.isAlive() && expiry > System.currentTimeMillis()) {
-            producingThread.join(1000);
-        }
-        
-        assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS));
-        assertTrue("producer completed within time ", !producingThread.isAlive());
-        
-        Thread.sleep(2 * Math.max(ttl, expiryPeriod));
-        DestinationViewMBean view = createView(destination);
+        assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                producingThread.join(1000);
+                return !producingThread.isAlive();
+            }      
+        }));
+             
+        final DestinationViewMBean view = createView(destination);
             
-        assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount());
-        assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount());     
+        assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return 1000 == view.getDispatchCount();
+            }
+        }));
+        assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return sendCount == view.getExpiredCount();
+            }
+        }));     
         
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
         
         // let the ack happen
         waitCondition.countDown();
-     
-        Thread.sleep(Math.max(ttl, expiryPeriod));
-        
-        assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount());
         
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                return 0 == view.getInFlightCount();
+            }
+        });
+        LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                + ", size= " + view.getQueueSize());
         assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
+        assertEquals("size gets back to 0 ", 0, view.getQueueSize());
+        assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
         
         consumer.close();
         LOG.info("done: " + getName());

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java?rev=793892&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java Tue Jul 14 12:59:19 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.util;
+
+
+public class Wait {
+    
+    public static final long MAX_WAIT_MILLIS = 30*1000;
+    
+    public interface Condition {
+        boolean isSatisified() throws Exception;
+    }
+
+    public static boolean waitFor(Condition condition) throws Exception {
+        return waitFor(condition, MAX_WAIT_MILLIS);
+    }
+    
+    public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+        final long expiry = System.currentTimeMillis() + duration;
+        while (!condition.isSatisified() && System.currentTimeMillis() < expiry) {
+            Thread.sleep(1000);
+        }   
+        return condition.isSatisified();
+    }  
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/Wait.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date