You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/03 14:02:08 UTC

svn commit: r790880 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/jmx/ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/broker/region/policy/ src/main/java/org/apache/activemq/usage/ s...

Author: gtully
Date: Fri Jul  3 12:02:07 2009
New Revision: 790880

URL: http://svn.apache.org/viewvc?rev=790880&view=rev
Log:
first cut of resolution to deterministic expiry https://issues.apache.org/activemq/browse/AMQ-1112 - default period is 30 seconds, destination policy entry allows it to be specified or turned off (0)

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Jul  3 12:02:07 2009
@@ -451,8 +451,6 @@
             <exclude>**/AMQDeadlockTest3.*</exclude>
             <exclude>**/AMQ1936Test.*</exclude>
             
-            <!-- excluding it until the issue is fixed (AMQ-1112), so we can have successful builds -->
-            <exclude>**/MessageExpirationReaperTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Fri Jul  3 12:02:07 2009
@@ -89,6 +89,10 @@
     public long getInFlightCount() {
         return destination.getDestinationStatistics().getInflight().getCount();
     }
+    
+    public long getExpiredCount() {
+        return destination.getDestinationStatistics().getExpired().getCount();
+    }
 
     public long getConsumerCount() {
         return destination.getDestinationStatistics().getConsumers().getCount();
@@ -363,4 +367,5 @@
         }
         return answer;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Fri Jul  3 12:02:07 2009
@@ -73,6 +73,14 @@
      */
     long getInFlightCount();
 
+    
+    /**
+     * Returns the number of messages that have expired
+     * 
+     * @return The number of messages that have expired
+     */
+    long getExpiredCount();
+
     /**
      * Returns the number of consumers subscribed this destination.
      * 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Jul  3 12:02:07 2009
@@ -44,6 +44,7 @@
      */
     public static final int MAX_PAGE_SIZE = 200;
     public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
+    public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
@@ -69,6 +70,8 @@
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
     protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
+    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
+    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
 
     /**
      * @param broker
@@ -213,7 +216,23 @@
     public void setMaxBrowsePageSize(int maxPageSize) {
         this.maxBrowsePageSize = maxPageSize;
     }
+    
+    public int getMaxExpirePageSize() {
+        return this.maxExpirePageSize;
+    }
+
+    public void setMaxExpirePageSize(int maxPageSize) {
+        this.maxExpirePageSize  = maxPageSize;
+    }
 
+    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
+        this.expireMessagesPeriod = expireMessagesPeriod;
+    }
+
+    public long getExpireMessagesPeriod() {
+        return expireMessagesPeriod;
+    }
+    
     public boolean isUseCache() {
         return useCache;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Fri Jul  3 12:02:07 2009
@@ -21,7 +21,6 @@
 import org.apache.activemq.management.PollCountStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.management.TimeStatisticImpl;
-import org.apache.tools.ant.taskdefs.condition.IsReference;
 
 /**
  * The J2EE Statistics for the a Destination.
@@ -38,6 +37,7 @@
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
     protected CountStatisticImpl inflight;
+    protected CountStatisticImpl expired;
     protected TimeStatisticImpl processTime;
 
     public DestinationStatistics() {
@@ -46,6 +46,8 @@
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
         inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
+        expired = new CountStatisticImpl("expired", "The number of messages that have expired");
+        
         consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
         consumers.setDoReset(false);
         producers = new CountStatisticImpl("producers", "The number of producers that that are publishing messages to the destination");
@@ -57,6 +59,7 @@
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
         addStatistic("inflight", inflight);
+        addStatistic("expired", expired);  
         addStatistic("consumers", consumers);
         addStatistic("producers", producers);
         addStatistic("messages", messages);
@@ -76,6 +79,10 @@
         return inflight;
     }
 
+    public CountStatisticImpl getExpired() {
+        return expired;
+    }
+
     public CountStatisticImpl getConsumers() {
         return consumers;
     }
@@ -111,6 +118,7 @@
             dequeues.reset();
             dispatched.reset();
             inflight.reset();
+            expired.reset();
         }
     }
 
@@ -120,6 +128,7 @@
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
         inflight.setEnabled(enabled);
+        expired.setEnabled(true);
         consumers.setEnabled(enabled);
         producers.setEnabled(enabled);
         messages.setEnabled(enabled);
@@ -134,6 +143,7 @@
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
             inflight.setParent(parent.inflight);
+            expired.setParent(parent.expired);
             consumers.setParent(parent.consumers);
             producers.setParent(parent.producers);
             messagesCached.setParent(parent.messagesCached);
@@ -144,6 +154,7 @@
             dispatched.setParent(null);
             dequeues.setParent(null);
             inflight.setParent(null);
+            expired.setParent(null);
             consumers.setParent(null);
             producers.setParent(null);
             messagesCached.setParent(null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul  3 12:02:07 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -66,6 +67,7 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.DeterministicTaskRunner;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -112,7 +114,13 @@
             wakeup();
         }
     };
+    private final Runnable expireMessagesTask = new Runnable() {
+        public void run() {
+            expireMessages();          
+        }
+    };
     private final Object iteratingMutex = new Object() {};
+    private static final Scheduler scheduler = Scheduler.getInstance();
     
     private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
 
@@ -177,6 +185,11 @@
                
             this.taskRunner = new DeterministicTaskRunner(this.executor,this);
         }
+        
+        if (getExpireMessagesPeriod() > 0) {
+            scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
+        }
+        
         super.initialize();
         if (store != null) {
             // Restore the persistent messages.
@@ -192,7 +205,7 @@
                         // Message could have expired while it was being
                         // loaded..
                         if (broker.isExpired(message)) {
-                            messageExpired(createConnectionContext(), message);
+                            messageExpired(createConnectionContext(), null, message, false);
                             return true;
                         }
                         if (hasSpace()) {
@@ -416,11 +429,11 @@
                             public void run() {
     
                                 try {
-    
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
                                         broker.messageExpired(context, message);
+                                        destinationStatistics.getExpired().increment();
                                     } else {
                                         doMessageSend(producerExchange, message);
                                     }
@@ -498,6 +511,7 @@
                         throw new IOException(
                                 "Connection closed, send aborted.");
                     }
+                    LOG.debug(this  + ", waiting for store space... msg: " + message);
                 }
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 store.addMessage(context, message);
@@ -516,8 +530,7 @@
                         // op, by that time the message could have expired..
                         if (broker.isExpired(message)) {
                             broker.messageExpired(context, message);
-                            //message not added to stats yet
-                            //destinationStatistics.getMessages().decrement();
+                            destinationStatistics.getExpired().increment();
                             return;
                         }
                         sendMessage(context, message);
@@ -537,9 +550,34 @@
             sendMessage(context, message);
         }
     }
+    
+    private void expireMessages() {
+        LOG.info("expiring messages...");
 
-	public void gc(){
-	}
+        // just track the insertion count
+        List<Message> l = new AbstractList<Message>() {
+            int size = 0;
+
+            @Override
+            public void add(int index, Message element) {
+                size++;
+            }
+
+            @Override
+            public int size() {
+                return size;
+            }
+
+            @Override
+            public Message get(int index) {
+                return null;
+            }
+        };
+        doBrowse(true, l, getMaxBrowsePageSize());
+    }
+
+    public void gc(){
+    }
     
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
         messageConsumed(context, node);
@@ -593,6 +631,10 @@
         if (this.executor != null) {
             this.executor.shutdownNow();
         }
+        
+        LOG.info(toString() + ", canceling expireMessagesTask");
+        scheduler.cancel(expireMessagesTask);
+        
         if (messages != null) {
             messages.stop();
         }
@@ -691,57 +733,74 @@
         return result;
     }
 
-    public Message[] browse() {
-        int count = 0;
+    public Message[] browse() {    
         List<Message> l = new ArrayList<Message>();
+        doBrowse(false, l, getMaxBrowsePageSize());
+        return l.toArray(new Message[l.size()]);
+    }
+    
+    public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
+        final ConnectionContext connectionContext = createConnectionContext();
         try {
-            pageInMessages(false);
-            synchronized (this.pagedInPendingDispatch) {
-                for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
-                        .iterator(); i.hasNext()
-                        && count < getMaxBrowsePageSize();) {
-                    l.add(i.next().getMessage());
-                    count++;
+            pageInMessages(forcePageIn);
+            List<MessageReference> toExpire = new ArrayList<MessageReference>();
+            dispatchLock.lock();
+            try {
+                synchronized (pagedInPendingDispatch) {
+                    addAll(pagedInPendingDispatch, l, max, toExpire);
+                    for (MessageReference ref : toExpire) {
+                        pagedInPendingDispatch.remove(ref);
+                        messageExpired(connectionContext, ref, false);
+                    }
                 }
-            }
-            if (count < getMaxBrowsePageSize()) {
+                toExpire.clear();
                 synchronized (pagedInMessages) {
-                    for (Iterator<QueueMessageReference> i = this.pagedInMessages
-                            .values().iterator(); i.hasNext()
-                            && count < getMaxBrowsePageSize();) {
-                        Message m = i.next().getMessage();
-                        if (l.contains(m) == false) {
-                            l.add(m);
-                            count++;
-                        }
-                    }
+                    addAll(pagedInMessages.values(), l, max, toExpire);   
                 }
-            }
-            if (count < getMaxBrowsePageSize()) {
-                synchronized (messages) {
-                    try {
-                        messages.reset();
-                        while (messages.hasNext()
-                                && count < getMaxBrowsePageSize()) {
-                            MessageReference node = messages.next();
-                            messages.rollback(node.getMessageId());
-                            if (node != null) {
-                                Message m = node.getMessage();
-                                if (l.contains(m) == false) {
-                                    l.add(m);
-                                    count++;
+                for (MessageReference ref : toExpire) {
+                    messageExpired(connectionContext, ref, false);
+                }
+                
+                if (l.size() < getMaxBrowsePageSize()) {
+                    synchronized (messages) {
+                        try {
+                            messages.reset();
+                            while (messages.hasNext() && l.size() < max) {
+                                MessageReference node = messages.next();
+                                messages.rollback(node.getMessageId());
+                                if (node != null) {
+                                    if (broker.isExpired(node)) {
+                                        messageExpired(connectionContext,
+                                                createMessageReference(node.getMessage()), false);
+                                    } else if (l.contains(node.getMessage()) == false) {
+                                        l.add(node.getMessage());
+                                    }
                                 }
                             }
+                        } finally {
+                            messages.release();
                         }
-                    } finally {
-                        messages.release();
                     }
                 }
+            } finally {
+                dispatchLock.unlock();
             }
         } catch (Exception e) {
-            LOG.error("Problem retrieving message in browse() ", e);
+            LOG.error("Problem retrieving message for browse", e);
+        }
+    }
+
+    private void addAll(Collection<QueueMessageReference> refs,
+            List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+        for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
+                && l.size() < getMaxBrowsePageSize();) {
+            QueueMessageReference ref = i.next();
+            if (broker.isExpired(ref)) {
+                toExpire.add(ref);
+            } else if (l.contains(ref.getMessage()) == false) {
+                l.add(ref.getMessage());
+            }
         }
-        return l.toArray(new Message[l.size()]);
     }
 
     public Message getMessage(String id) {
@@ -1190,22 +1249,26 @@
         }
     }
     
-    public void messageExpired(ConnectionContext context,MessageReference reference) {
-        messageExpired(context,null,reference);
+    public void messageExpired(ConnectionContext context,MessageReference reference, boolean dispatched) {
+        messageExpired(context,null,reference, dispatched);
     }
     
     public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+        messageExpired(context, subs, reference, true);
+    }
+    
+    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference, boolean dispatched) {
         broker.messageExpired(context, reference);
         destinationStatistics.getDequeues().increment();
-        destinationStatistics.getInflight().decrement();
+        destinationStatistics.getExpired().increment();
+        if (dispatched) {
+            destinationStatistics.getInflight().decrement();
+        }
         try {
             removeMessage(context,subs,(QueueMessageReference)reference);
         } catch (IOException e) {
             LOG.error("Failed to remove expired Message from the store ",e);
         }
-        synchronized(pagedInMessages) {
-            pagedInMessages.remove(reference.getMessageId());
-        }
         wakeup();
     }
     
@@ -1286,7 +1349,7 @@
                                 result.add(ref);
                                 count++;
                             } else {
-                                messageExpired(createConnectionContext(), ref);
+                                messageExpired(createConnectionContext(), ref, false);
                             }
                         }
                     } finally {
@@ -1312,7 +1375,7 @@
         }
         return resultList;
     }
-    
+
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
         dispatchLock.lock();
         try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul  3 12:02:07 2009
@@ -709,6 +709,10 @@
 							BrokerSupport.resend(context,message,
 							        deadLetterDestination);
 						}
+					} else {
+					    if (LOG.isDebugEnabled()) {
+					        LOG.debug("Expired message with no DLQ strategy in place");
+					    }
 					}
 				}
 			}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul  3 12:02:07 2009
@@ -278,6 +278,7 @@
         // destination.. it may have expired.
         if (message.isExpired()) {
             broker.messageExpired(context, message);
+            getDestinationStatistics().getExpired().increment();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -306,6 +307,7 @@
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
+                                        getDestinationStatistics().getExpired().increment();
                                         broker.messageExpired(context, message);
                                     } else {
                                         doMessageSend(producerExchange, message);
@@ -361,6 +363,7 @@
                     // The usage manager could have delayed us by the time
                     // we unblock the message could have expired..
                     if (message.isExpired()) {
+                        getDestinationStatistics().getExpired().increment();
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Expired message: " + message);
                         }
@@ -418,6 +421,7 @@
                     // operration.. by that time the message could have
                     // expired..
                     if (broker.isExpired(message)) {
+                        getDestinationStatistics().getExpired().increment();
                         broker.messageExpired(context, message);
                         message.decrementReferenceCount();
                         return;
@@ -594,6 +598,7 @@
         broker.messageExpired(context, reference);
         destinationStatistics.getMessages().decrement();
         destinationStatistics.getEnqueues().decrement();
+        destinationStatistics.getExpired().increment();
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setDestination(destination);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jul  3 12:02:07 2009
@@ -161,6 +161,7 @@
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
+                    node.getRegionDestination().getDestinationStatistics().getExpired().increment();
                     broker.messageExpired(getContext(), node);
                     break;
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jul  3 12:02:07 2009
@@ -73,15 +73,15 @@
     private boolean advisoryWhenFull;
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
+    private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
+    private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
    
     public void configure(Broker broker,Queue queue) {
         baseConfiguration(queue);
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
-        if (deadLetterStrategy != null) {
-            queue.setDeadLetterStrategy(deadLetterStrategy);
-        }
+        queue.setDeadLetterStrategy(getDeadLetterStrategy());
         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
         if (memoryLimit > 0) {
             queue.getMemoryUsage().setLimit(memoryLimit);
@@ -104,9 +104,7 @@
         if (dispatchPolicy != null) {
             topic.setDispatchPolicy(dispatchPolicy);
         }
-        if (deadLetterStrategy != null) {
-            topic.setDeadLetterStrategy(deadLetterStrategy);
-        }
+        topic.setDeadLetterStrategy(getDeadLetterStrategy());
         if (subscriptionRecoveryPolicy != null) {
             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
         }
@@ -132,6 +130,8 @@
         destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
         destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
+        destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
+        destination.setMaxExpirePageSize(getMaxExpirePageSize());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -543,4 +543,21 @@
         this.advisdoryForFastProducers = advisdoryForFastProducers;
     }
 
+    public void setMaxExpirePageSize(int maxExpirePageSize) {
+        this.maxExpirePageSize = maxExpirePageSize;
+    }
+    
+    public int getMaxExpirePageSize() {
+        return maxExpirePageSize;
+    }
+    
+    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
+        this.expireMessagesPeriod = expireMessagesPeriod;
+    }
+    
+    public long getExpireMessagesPeriod() {
+        return expireMessagesPeriod;
+    }
+
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Fri Jul  3 12:02:07 2009
@@ -242,8 +242,8 @@
             LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: " 
                 + newPercentUsage + "% of available memory");
         }   
-        if (newPercentUsage >= 80) {
-            LOG.warn("Memory usage is now over 80%!");
+        if (newPercentUsage >= 100) {
+            LOG.warn("Memory usage is now at " + newPercentUsage  + "%");
         }
              
         if (started.get()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri Jul  3 12:02:07 2009
@@ -19,7 +19,6 @@
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,8 +27,6 @@
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java Fri Jul  3 12:02:07 2009
@@ -1,6 +1,7 @@
 package org.apache.activemq.bugs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageProducer;
@@ -16,6 +17,8 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.junit.After;
 import org.junit.Before;
@@ -56,8 +59,16 @@
         broker = new BrokerService();
 //        broker.setPersistent(false);
 //        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.setBrokerName(brokerName);
         broker.addConnector(brokerUrl);
+        
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(500);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        
         broker.start();
     }
     
@@ -85,15 +96,13 @@
         }
         
         // Let the messages expire 
-        Thread.sleep(1000);
+        Thread.sleep(2000);
         
         DestinationViewMBean view = createView(destination);
         
-        /*################### CURRENT EXPECTED FAILURE ####################*/ 
-        // The messages expire and should be reaped but they're not currently 
-        // reaped until there is an active consumer placed on the queue 
-        assertEquals("Incorrect count: " + view.getInFlightCount(), 0, view.getInFlightCount());
-        
+        assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
+        assertEquals("Incorrect queue size count", 0, view.getQueueSize());
+        assertEquals("Incorrect expired size count", 3, view.getEnqueueCount());   
         
         // Send more messages with an expiration 
         for (int i = 0; i < count; i++) {
@@ -101,10 +110,13 @@
             producer.send(message);
         }
         
+        // Let the messages expire 
+        Thread.sleep(2000);
+        
         // Simply browse the queue 
         Session browserSession = createSession();
         QueueBrowser browser = browserSession.createBrowser((Queue) destination);
-        browser.getEnumeration(); 
+        assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); 
         
         // The messages expire and should be reaped because of the presence of 
         // the queue browser 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri Jul  3 12:02:07 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -33,17 +35,21 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 
 public class ExpiredMessagesTest extends CombinationTestSupport {
 
+    private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
+    
 	BrokerService broker;
 	Connection connection;
 	Session session;
 	MessageProducer producer;
 	MessageConsumer consumer;
-	public ActiveMQDestination destination;
+	public ActiveMQDestination destination = new ActiveMQQueue("test");
 	
     public static Test suite() {
         return suite(ExpiredMessagesTest.class);
@@ -77,6 +83,7 @@
 		producer.setTimeToLive(100);
 		consumer = session.createConsumer(destination);
 		connection.start();
+		final AtomicLong received = new AtomicLong();
 		
 		Thread consumerThread = new Thread("Consumer Thread") {
 			public void run() {
@@ -84,7 +91,9 @@
 				try {
 					long end = System.currentTimeMillis();
 					while (end - start < 3000) {
-						consumer.receive(1000);
+						if (consumer.receive(1000) != null) {
+						    received.incrementAndGet();
+						}
 						Thread.sleep(100);
 						end = System.currentTimeMillis();
 					}
@@ -115,9 +124,13 @@
         consumerThread.join();
         producingThread.join();
         
+        
         DestinationViewMBean view = createView(destination);
+        LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+                + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
         
-        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
+        assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount());
+        //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
 	}
 	
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {