You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/20 19:08:14 UTC

svn commit: r558054 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apa...

Author: rajdavies
Date: Fri Jul 20 10:08:10 2007
New Revision: 558054

URL: http://svn.apache.org/viewvc?view=rev&rev=558054
Log:
Fix for:
 http://issues.apache.org/activemq/browse/AMQ-1207
http://issues.apache.org/activemq/browse/AMQ-880
http://issues.apache.org/activemq/browse/AMQ-450
http://issues.apache.org/activemq/browse/AMQ-879

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Fri Jul 20 10:08:10 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.advisory;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.activemq.broker.Broker;
@@ -24,6 +25,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -38,6 +40,8 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -49,7 +53,7 @@
  */
 public class AdvisoryBroker extends BrokerFilter {
     
-    //private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
+    private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
     
     protected final ConcurrentHashMap connections = new ConcurrentHashMap();
     protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
@@ -226,6 +230,16 @@
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
             producers.remove(info.getProducerId());
             fireProducerAdvisory(context, topic, info.createRemoveCommand());
+        }
+    }
+    
+    public void messageExpired(ConnectionContext context,MessageReference messageReference){
+        next.messageExpired(context,messageReference);
+        try{
+            ActiveMQTopic topic=AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
+            fireAdvisory(context,topic,messageReference.getMessage());
+        }catch(Exception e){
+            log.warn("Failed to fire message expired advisory");
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Fri Jul 20 10:08:10 2007
@@ -64,6 +64,13 @@
             return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName());
     }
     
+    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+        if (destination.isQueue()) {
+            return getExpiredQueueMessageAdvisoryTopic(destination);
+        }
+        return getExpiredTopicMessageAdvisoryTopic(destination);
+    }
+    
     public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
         String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
         return new ActiveMQTopic(name);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Fri Jul 20 10:08:10 2007
@@ -20,19 +20,15 @@
 import java.net.URI;
 import java.util.Set;
 import org.apache.activemq.Service;
-import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Region;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
@@ -135,6 +131,8 @@
     
     /**
      * Gets a list of all the prepared xa transactions.
+     * @param context transaction ids
+     * @return 
      * @throws Exception TODO
      */
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
@@ -151,7 +149,7 @@
      * Prepares a transaction. Only valid for xa transactions.
      * @param context
      * @param xid
-     * @return
+     * @return id
      * @throws Exception TODO
      */
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
@@ -176,6 +174,9 @@
 
     /**
      * Forgets a transaction.
+     * @param context 
+     * @param transactionId 
+     * @throws Exception 
      */
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
     
@@ -246,7 +247,35 @@
      */
     public URI getVmConnectorURI();
     
+    /**
+     * called when the brokerService starts
+     */
     public void brokerServiceStarted();
     
+    /**
+     * @return the BrokerService
+     */
     BrokerService getBrokerService();
+    
+    /**
+     * Ensure we get the Broker at the top of the Stack
+     * @return the broker at the top of the Stack
+     */
+    Broker getRoot();
+    
+    /**
+     * A Message has Expired
+     * @param context
+     * @param messageReference
+     * @throws Exception 
+     */
+    public void messageExpired(ConnectionContext context, MessageReference messageReference);
+    
+    /**
+     * A message needs to go the a DLQ
+     * @param context
+     * @param messageReference
+     * @throws Exception
+     */
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Allows you to intercept broker operation so that features such as security can be 
  * implemented as a pluggable filter.
@@ -245,5 +244,17 @@
     
     public BrokerService getBrokerService(){
         return next.getBrokerService();
+    }
+
+    public void messageExpired(ConnectionContext context,MessageReference message){
+        next.messageExpired(context,message); 
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+       next.sendToDeadLetterQueue(context,messageReference);       
+    }
+
+    public Broker getRoot() {
+       return next.getRoot();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,13 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,11 +42,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Dumb implementation - used to be overriden by listeners
  * 
@@ -245,4 +244,14 @@
     public BrokerService getBrokerService(){
         return null;
     }
+
+    public void messageExpired(ConnectionContext context,MessageReference message){        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){        
+    }
+    
+    public Broker getRoot(){
+        return null;
+     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Fri Jul 20 10:08:10 2007
@@ -21,10 +21,9 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -245,4 +244,16 @@
     public BrokerService getBrokerService(){
         throw new BrokerStoppedException(this.message);
     }
+
+    public void messageExpired(ConnectionContext context,MessageReference message){
+       throw new BrokerStoppedException(this.message);        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+       throw new BrokerStoppedException(this.message); 
+    }
+    
+    public Broker getRoot(){
+        throw new BrokerStoppedException(this.message);
+     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Like a BrokerFilter but it allows you to switch the getNext().broker.  This has more 
  * overhead than a BrokerFilter since access to the getNext().broker has to synchronized
@@ -258,6 +257,19 @@
     
     public BrokerService getBrokerService(){
         return getNext().getBrokerService();
+    }
+
+   
+    public void messageExpired(ConnectionContext context,MessageReference message){
+        getNext().messageExpired(context,message);        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
+       getNext().sendToDeadLetterQueue(context,messageReference);
+    }
+    
+    public Broker getRoot(){
+        return getNext().getRoot();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Jul 20 10:08:10 2007
@@ -332,14 +332,15 @@
                     // Try to auto create the destination... re-invoke broker from the
                     // top so that the proper security checks are performed.
                     try {
+                        
+                        context.getBroker().addDestination(context,destination);
                         dest = addDestination(context, destination);
-                        //context.getBroker().addDestination(context,destination);
                     }
                     catch (DestinationAlreadyExistsException e) {
                         // if the destination already exists then lets ignore this error
                     }
                     // We should now have the dest created.
-                    //dest=(Destination) destinations.get(destination);
+                    dest=(Destination) destinations.get(destination);
                 }
                 if(dest==null){
                     throw new JMSException("The destination "+destination+" does not exist.");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Jul 20 10:08:10 2007
@@ -42,118 +42,121 @@
  * @author fateev@amazon.com
  * @version $Revision$
  */
-public class DestinationFactoryImpl extends DestinationFactory {
+public class DestinationFactoryImpl extends DestinationFactory{
 
     protected final UsageManager memoryManager;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final PersistenceAdapter persistenceAdapter;
     protected RegionBroker broker;
 
-    public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
-            PersistenceAdapter persistenceAdapter) {
-        this.memoryManager = memoryManager;
-        this.taskRunnerFactory = taskRunnerFactory;
-        if (persistenceAdapter == null) {
+    public DestinationFactoryImpl(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
+            PersistenceAdapter persistenceAdapter){
+        this.memoryManager=memoryManager;
+        this.taskRunnerFactory=taskRunnerFactory;
+        if(persistenceAdapter==null){
             throw new IllegalArgumentException("null persistenceAdapter");
         }
-        this.persistenceAdapter = persistenceAdapter;
+        this.persistenceAdapter=persistenceAdapter;
     }
 
-    public void setRegionBroker(RegionBroker broker) {
-        if (broker == null) {
+    public void setRegionBroker(RegionBroker broker){
+        if(broker==null){
             throw new IllegalArgumentException("null broker");
         }
-        this.broker = broker;
+        this.broker=broker;
     }
 
-    public Set getDestinations() {
+    public Set getDestinations(){
         return persistenceAdapter.getDestinations();
     }
 
     /**
      * @return instance of {@link Queue} or {@link Topic}
      */
-    public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
-        if (destination.isQueue()) {
-            if (destination.isTemporary()) {
-                final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-                return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
-                   
-                    public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+    public Destination createDestination(ConnectionContext context,ActiveMQDestination destination,
+            DestinationStatistics destinationStatistics) throws Exception{
+        if(destination.isQueue()){
+            if(destination.isTemporary()){
+                final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
+                return new Queue(broker.getRoot(),destination,memoryManager,null,
+                        destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
+
+                    public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
                         // Only consumers on the same connection can consume from 
                         // the temporary destination
-                        if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                        if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
                             throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
                         }
-                        super.addSubscription(context, sub);
+                        super.addSubscription(context,sub);
                     };
                 };
-            } else {
-                MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
-                Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
-                configureQueue(queue, destination);
+            }else{
+                MessageStore store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
+                Queue queue=new Queue(broker.getRoot(),destination,memoryManager,store,
+                        destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
+                configureQueue(queue,destination);
                 queue.initialize();
                 return queue;
             }
-        } else if (destination.isTemporary()){
-            final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-            return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
-                public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+        }else if(destination.isTemporary()){
+            final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
+            return new Topic(broker.getRoot(),destination,null,memoryManager,
+                    destinationStatistics,taskRunnerFactory){
+
+                public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
                     // Only consumers on the same connection can consume from 
                     // the temporary destination
-                    if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                    if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
                         throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
                     }
-                    super.addSubscription(context, sub);
+                    super.addSubscription(context,sub);
                 };
             };
-        } else {
-            TopicMessageStore store = null;
-            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-                store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+        }else{
+            TopicMessageStore store=null;
+            if(!AdvisorySupport.isAdvisoryTopic(destination)){
+                store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
             }
-            
-            Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
-            configureTopic(topic, destination);
-            
+            Topic topic=new Topic(broker.getRoot(),destination,store,memoryManager,
+                    destinationStatistics,taskRunnerFactory);
+            configureTopic(topic,destination);
             return topic;
         }
     }
 
-    protected void configureQueue(Queue queue, ActiveMQDestination destination) {
-        if (broker == null) {
+    protected void configureQueue(Queue queue,ActiveMQDestination destination){
+        if(broker==null){
             throw new IllegalStateException("broker property is not set");
         }
-        if (broker.getDestinationPolicy() != null) {
-            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
-            if (entry != null) {
+        if(broker.getDestinationPolicy()!=null){
+            PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+            if(entry!=null){
                 entry.configure(queue,broker.getTempDataStore());
             }
         }
     }
 
-    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
-        if (broker == null) {
+    protected void configureTopic(Topic topic,ActiveMQDestination destination){
+        if(broker==null){
             throw new IllegalStateException("broker property is not set");
         }
-        if (broker.getDestinationPolicy() != null) {
-            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
-            if (entry != null) {
+        if(broker.getDestinationPolicy()!=null){
+            PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+            if(entry!=null){
                 entry.configure(topic);
             }
         }
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException {
+    public long getLastMessageBrokerSequenceId() throws IOException{
         return persistenceAdapter.getLastMessageBrokerSequenceId();
     }
 
-    public PersistenceAdapter getPersistenceAdapter() {
+    public PersistenceAdapter getPersistenceAdapter(){
         return persistenceAdapter;
     }
 
-    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
+    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException{
         return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jul 20 10:08:10 2007
@@ -276,17 +276,7 @@
      * @throws Exception
      */
     protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{
-        // Send the message to the DLQ
-        Message message=node.getMessage();
-        if(message!=null){
-            // The original destination and transaction id do not get filled when the message is first
-            // sent,
-            // it is only populated if the message is routed to another destination like the DLQ
-            DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
-            ActiveMQDestination deadLetterDestination=deadLetterStrategy
-                    .getDeadLetterQueueFor(message.getDestination());
-            BrokerSupport.resend(context,message,deadLetterDestination);
-        }
+        broker.sendToDeadLetterQueue(context,node);
     }
 
     /**
@@ -393,7 +383,9 @@
                             // Message may have been sitting in the pending list a while
                             // waiting for the consumer to ak the message.
                             if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                                continue; // just drop it.
+                                broker.messageExpired(getContext(),node);
+                                dequeueCounter++;
+                                continue;
                             }
                             dispatch(node);
                             count++;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 20 10:08:10 2007
@@ -27,6 +27,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -72,7 +73,6 @@
 public class Queue implements Destination, Task {
 
     private final Log log;
-
     private final ActiveMQDestination destination;
     private final List consumers = new CopyOnWriteArrayList();
     private final Valve dispatchValve = new Valve(true);
@@ -96,9 +96,11 @@
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     private boolean started = false;
+    final Broker broker;
     
-    public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
+    public Queue(Broker broker,ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
+        this.broker=broker;
         this.destination = destination;
         this.usageManager = new UsageManager(memoryManager,destination.toString());
         this.usageManager.setUsagePortion(1.0f);
@@ -136,7 +138,8 @@
                     public void recoverMessage(Message message){
                         // Message could have expired while it was being loaded..
                         if(message.isExpired()){
-                            // TODO remove from store
+                            broker.messageExpired(createConnectionContext(),message);
+                            destinationStatistics.getMessages().decrement();
                             return;
                         }
                         message.setRegionDestination(Queue.this);
@@ -342,9 +345,8 @@
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if(message.isExpired()){
-            if (log.isDebugEnabled()) {
-                log.debug("Expired message: " + message);
-            }
+            broker.messageExpired(context,message);
+            destinationStatistics.getMessages().decrement();
             if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
         		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
 				context.getConnection().dispatchAsync(ack);	    	            	        		
@@ -365,9 +367,8 @@
         					
         					// While waiting for space to free up... the message may have expired.
         			        if(message.isExpired()){
-        			            if (log.isDebugEnabled()) {
-        			                log.debug("Expired message: " + message);
-        			            }
+        			            broker.messageExpired(context,message);
+                                destinationStatistics.getMessages().decrement();
         			            
         			            if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
         			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@@ -440,10 +441,8 @@
                         // It could take while before we receive the commit
                         // op, by that time the message could have expired..
 	                    if(message.isExpired()){
-	                        // TODO: remove message from store.
-	                        if (log.isDebugEnabled()) {
-	                            log.debug("Expired message: " + message);
-	                        }
+	                        broker.messageExpired(context,message);
+                            destinationStatistics.getMessages().decrement();
 	                        return;
 	                    }
 	                    sendMessage(context,message);
@@ -1011,9 +1010,8 @@
                                 result.add(node);
                                 count++;
                             }else{
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Expired message: " + node);
-                                }
+                                broker.messageExpired(createConnectionContext(),node);
+                                destinationStatistics.getMessages().decrement();
                             }
                         }
                     }finally{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 20 10:08:10 2007
@@ -37,6 +37,7 @@
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -62,6 +63,7 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceStopper;
@@ -625,6 +627,52 @@
     public BrokerService getBrokerService(){
         return brokerService;
     }
-    
-    
+
+    public void messageExpired(ConnectionContext context,MessageReference node){
+        if(log.isDebugEnabled()){
+            log.debug("Message expired "+node);
+        }
+        getRoot().sendToDeadLetterQueue(context,node);
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference node){
+        try{
+            if(node!=null){
+                Message message=node.getMessage();
+                if(message!=null){
+                    DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+                    if(deadLetterStrategy!=null){
+                        if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+                            long expiration=message.getExpiration();
+                            message.setExpiration(0);
+                            message.setProperty("originalExpiration",new Long(expiration));
+                            if(!message.isPersistent()){
+                                message.setPersistent(true);
+                                message.setProperty("originalDeliveryMode","NON_PERSISTENT");
+                            }
+                            // The original destination and transaction id do not get filled when the message is first
+                            // sent,
+                            // it is only populated if the message is routed to another destination like the DLQ
+                            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
+                                    .getDestination());
+                            BrokerSupport.resend(context,message,deadLetterDestination);
+                        }
+                    }
+                }else{
+                    log.warn("Null message for node: "+node);
+                }
+            }
+        }catch(Exception e){
+            log.warn("Failed to pass expired message to dead letter queue");
+        }
+    }
+
+    public Broker getRoot(){
+        try{
+            return getBrokerService().getBroker();
+        }catch(Exception e){
+            log.fatal("Trying to get Root Broker "+e);
+            throw new RuntimeException("The broker from the BrokerService should not throw an exception");
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Jul 20 10:08:10 2007
@@ -41,7 +41,7 @@
     
     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-        return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
+        return new Queue(broker.getRoot(),destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
             
             public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 20 10:08:10 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -72,10 +73,11 @@
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
+    final Broker broker;
     
-    public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
+    public Topic(Broker broker,ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory) {
-
+        this.broker=broker;
         this.destination = destination;
         this.store = store; //this could be NULL! (If an advsiory)
         this.usageManager = new UsageManager(memoryManager,destination.toString());
@@ -261,9 +263,8 @@
     	// There is delay between the client sending it and it arriving at the
     	// destination.. it may have expired.
     	if( message.isExpired() ) {
-            if (log.isDebugEnabled()) {
-                log.debug("Expired message: " + message);
-            }
+            broker.messageExpired(context,message);
+            destinationStatistics.getMessages().decrement();
             if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
         		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
 				context.getConnection().dispatchAsync(ack);	    	            	        		
@@ -285,9 +286,8 @@
         					
         					// While waiting for space to free up... the message may have expired.
         			        if(message.isExpired()){
-        			            if (log.isDebugEnabled()) {
-        			                log.debug("Expired message: " + message);
-        			            }
+        			            broker.messageExpired(context,message);
+                                destinationStatistics.getMessages().decrement();
         			            
         			            if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
         			        		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@@ -357,7 +357,9 @@
                     	// It could take while before we receive the commit
                     	// operration.. by that time the message could have expired..
                     	if( message.isExpired() ) {
-                    		// TODO: remove message from store.
+                    		broker.messageExpired(context,message);
+                            message.decrementReferenceCount();
+                            destinationStatistics.getMessages().decrement();
                     		return;
                     	}
                         dispatch(context, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jul 20 10:08:10 2007
@@ -103,12 +103,7 @@
                             int messagesToEvict=oldMessages.length;
                             for(int i=0;i<messagesToEvict;i++){
                                 MessageReference oldMessage=oldMessages[i];
-                                oldMessage.decrementReferenceCount();
-                                matched.remove(oldMessage);
-                                discarded++;
-                                if(log.isDebugEnabled()){
-                                    log.debug("Discarding message "+oldMessages[i]);
-                                }
+                                discard(oldMessage);
                             }
                             // lets avoid an infinite loop if we are given a bad eviction strategy
                             // for a bad strategy lets just not evict
@@ -138,6 +133,7 @@
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
+                    broker.messageExpired(getContext(),node);
                     break;
                 }
             }
@@ -367,6 +363,8 @@
                     // waiting for the consumer to ak the message.
                     if(message.isExpired()){
                         message.decrementReferenceCount();
+                        broker.messageExpired(getContext(),message);
+                        dequeueCounter.incrementAndGet();
                         continue; // just drop it.
                     }
                     dispatch(message);
@@ -409,6 +407,17 @@
             node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
             node.decrementReferenceCount();
         }
+    }
+    
+    private void discard(MessageReference message) {
+        message.decrementReferenceCount();
+        matched.remove(message);
+        discarded++;
+        dequeueCounter.incrementAndGet();
+        if(log.isDebugEnabled()){
+            log.debug("Discarding message "+message);
+        }
+        broker.getRoot().sendToDeadLetterQueue(getContext(),message);
     }
 
     public String toString(){

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?view=auto&rev=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.command.Message;
+
+/**
+ * A strategy for choosing which destination is used for dead letter queue messages.
+ * 
+ * @version $Revision: 426366 $
+ */
+public abstract  class AbstractDeadLetterStrategy implements DeadLetterStrategy {
+    private boolean processNonPersistent=true;
+    private boolean processExpired=true;
+    
+    public boolean isSendToDeadLetterQueue(Message message){
+        boolean result=false;
+        if(message!=null){
+            result=true;
+            if(message.isPersistent()==false&&processNonPersistent==false){
+                result=false;
+            }
+            if(message.isExpired()&&processExpired==false){
+                result=false;
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * @return the processExpired
+     */
+    public boolean isProcessExpired(){
+        return this.processExpired;
+    }
+    
+    /**
+     * @param processExpired the processExpired to set
+     */
+    public void setProcessExpired(boolean processExpired){
+        this.processExpired=processExpired;
+    }
+    
+    /**
+     * @return the processNonPersistent
+     */
+    public boolean isProcessNonPersistent(){
+        return this.processNonPersistent;
+    }
+    
+    /**
+     * @param processNonPersistent the processNonPersistent to set
+     */
+    public void setProcessNonPersistent(boolean processNonPersistent){
+        this.processNonPersistent=processNonPersistent;
+    }
+
+ 
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
 
 /**
  * A strategy for choosing which destination is used for dead letter queue messages.
@@ -25,6 +26,14 @@
  * @version $Revision$
  */
 public interface DeadLetterStrategy {
+    
+    /**
+     * Allow pluggable strategy for deciding if message should be sent to a dead letter queue
+     * for example, you might not want to ignore expired or non-persistent messages
+     * @param message
+     * @return true if message should be sent to a dead letter queue
+     */
+    public boolean isSendToDeadLetterQueue(Message message);
 
     /**
      * Returns the dead letter queue for the given destination.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
  * 
  * @version $Revision$
  */
-public class IndividualDeadLetterStrategy implements DeadLetterStrategy {
+public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
 
     private String topicPrefix = "ActiveMQ.DLQ.Topic.";
     private String queuePrefix = "ActiveMQ.DLQ.Queue.";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
  * 
  * @version $Revision$
  */
-public class SharedDeadLetterStrategy implements DeadLetterStrategy {
+public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
 
     private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ");
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Fri Jul 20 10:08:10 2007
@@ -19,6 +19,7 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -243,5 +244,15 @@
     
     public BrokerService getBrokerService(){
         return null;
+    }
+
+    public void messageExpired(ConnectionContext context,MessageReference messageReference){
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
+    }
+
+    public Broker getRoot(){
+        return this;
     }
 }