You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/22 15:28:13 UTC

svn commit: r614206 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: jmx/ region/ region/cursors/ region/policy/

Author: rajdavies
Date: Tue Jan 22 06:28:10 2008
New Revision: 614206

URL: http://svn.apache.org/viewvc?rev=614206&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1562

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Tue Jan 22 06:28:10 2008
@@ -93,7 +93,7 @@
         return broker.getDestinationStatistics().getMessagesCached().getCount();
     }
 
-    public int getMemoryPercentageUsed() {
+    public int getMemoryPercentUsage() {
         return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
     }
 
@@ -109,16 +109,16 @@
         return brokerService.getSystemUsage().getStoreUsage().getLimit();
     }
 
-    public int getStorePercentageUsed() {
+    public int getStorePercentUsage() {
         return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
     }
 
  
-    public long getTmpLimit() {
+    public long getTempLimit() {
        return brokerService.getSystemUsage().getTempUsage().getLimit();
     }
 
-    public int getTmpPercentageUsed() {
+    public int getTempPercentUsage() {
        return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
     }
 
@@ -126,7 +126,7 @@
         brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
     }
 
-    public void setTmpLimit(long limit) {
+    public void setTempLimit(long limit) {
         brokerService.getSystemUsage().getTempUsage().setLimit(limit);
     }
     
@@ -172,7 +172,7 @@
     }
 
     public ObjectName[] getTopicSubscribers() {
-        return broker.getTemporaryTopicSubscribers();
+        return broker.getTopicSubscribers();
     }
 
     public ObjectName[] getDurableTopicSubscribers() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Tue Jan 22 06:28:10 2008
@@ -61,23 +61,23 @@
 
     long getTotalMessageCount();
 
-    int getMemoryPercentageUsed();
+    int getMemoryPercentUsage();
 
     long getMemoryLimit();
 
     void setMemoryLimit(long limit);
         
-    int getStorePercentageUsed();
+    int getStorePercentUsage();
 
     long getStoreLimit();
 
     void setStoreLimit(long limit);
     
-    int getTmpPercentageUsed();
+    int getTempPercentUsage();
 
-    long getTmpLimit();
+    long getTempLimit();
 
-    void setTmpLimit(long limit);
+    void setTempLimit(long limit);
     
     boolean isPersistent();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Jan 22 06:28:10 2008
@@ -93,7 +93,7 @@
         return destination.getDestinationStatistics().getMessagesCached().getCount();
     }
 
-    public int getMemoryPercentageUsed() {
+    public int getMemoryPercentUsage() {
         return destination.getMemoryUsage().getPercentUsage();
     }
 
@@ -294,7 +294,7 @@
      }
 
     
-    public float getMemoryLimitPortion() {
+    public float getMemoryUsagePortion() {
         return destination.getMemoryUsage().getUsagePortion();
     }
 
@@ -306,7 +306,7 @@
        return destination.isProducerFlowControl();
     }
     
-    public void setMemoryLimitPortion(float value) {
+    public void setMemoryUsagePortion(float value) {
         destination.getMemoryUsage().setUsagePortion(value);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Tue Jan 22 06:28:10 2008
@@ -127,7 +127,7 @@
     /**
      * @return the percentage of amount of memory used
      */
-    int getMemoryPercentageUsed();
+    int getMemoryPercentUsage();
 
     /**
      * @return the amount of memory allocated to this destination
@@ -143,13 +143,13 @@
     /**
      * @return the portion of memory from the broker memory limit for this destination
      */
-    float getMemoryLimitPortion();
+    float getMemoryUsagePortion();
     
     /**
      * set the portion of memory from the broker memory limit for this destination
      * @param value
      */
-    void setMemoryLimitPortion(float value);
+    void setMemoryUsagePortion(float value);
 
     /**
      * Browses the current destination returning a list of messages

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Jan 22 06:28:10 2008
@@ -139,10 +139,7 @@
         return destination;
     }
 
-    public final String getDestination() {
-        return destination.getPhysicalName();
-    }
-    
+        
     public final String getName() {
         return getActiveMQDestination().getPhysicalName();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Tue Jan 22 06:28:10 2008
@@ -76,7 +76,7 @@
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-                return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, broker.getTempDataStore()) {
+                return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
 
                     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
                         // Only consumers on the same connection can consume
@@ -90,7 +90,7 @@
                 };
             } else {
                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
-                Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory, broker.getTempDataStore());
+                Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
                 configureQueue(queue, destination);
                 queue.initialize();
                 return queue;
@@ -127,7 +127,7 @@
         if (broker.getDestinationPolicy() != null) {
             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
             if (entry != null) {
-                entry.configure(queue, broker.getTempDataStore());
+                entry.configure(broker,queue);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Jan 22 06:28:10 2008
@@ -52,7 +52,7 @@
     public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws JMSException {
         super(broker,usageManager, context, info);
-        this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
+        this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
@@ -218,17 +218,10 @@
         node.decrementReferenceCount();
     }
 
-    public String getSubscriptionName() {
-        return subscriptionKey.getSubscriptionName();
-    }
-
+    
     public synchronized String toString() {
         return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
                + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
-    }
-
-    public String getClientId() {
-        return subscriptionKey.getClientId();
     }
 
     public SubscriptionKey getSubscriptionKey() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jan 22 06:28:10 2008
@@ -95,14 +95,14 @@
         };
     };
     
-    public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
-                 TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
+    public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+                 TaskRunnerFactory taskFactory) throws Exception {
         super(broker, store, destination,systemUsage, parentStats);
         
-        if (destination.isTemporary() || tmpStore==null ) {
+        if (destination.isTemporary() || broker == null || store==null ) {
             this.messages = new VMPendingMessageCursor();
         } else {
-            this.messages = new StoreQueueCursor(this, tmpStore);
+            this.messages = new StoreQueueCursor(broker,this);
         }
 
         this.taskRunner = taskFactory.createTaskRunner(this, "Queue  " + destination.getPhysicalName());
@@ -318,11 +318,11 @@
         final ConnectionContext context = producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
-
+        message.setRegionDestination(this);
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
         if (message.isExpired()) {
-            broker.messageExpired(context, message);
+            broker.getRoot().messageExpired(context, message);
             //message not added to stats yet
             //destinationStatistics.getMessages().decrement();
             if (sendProducerAck) {
@@ -402,6 +402,7 @@
                     if (log.isDebugEnabled()) {
                         log.debug("Expired message: " + message);
                     }
+                    broker.getRoot().messageExpired(context, message);
                     return;
                 }
             }
@@ -416,7 +417,6 @@
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         synchronized (sendLock) {
-            message.setRegionDestination(this);
             if (store != null && message.isPersistent()) {
                 while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
                     if (context.getStopping().get()) {
@@ -678,11 +678,7 @@
 
                     // We should only delete messages that can be locked.
                     if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
-                        MessageAck ack = new MessageAck();
-                        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-                        ack.setDestination(destination);
-                        ack.setMessageID(r.getMessageId());
-                        removeMessage(c, null, r, ack);
+                        removeMessage(c,(IndirectMessageReference) r);
                     }
                 } catch (IOException e) {
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Jan 22 06:28:10 2008
@@ -705,14 +705,18 @@
 							        deadLetterDestination);
 							sent=true;
 						}
+					}else {
+					  //don't want to warn about failing to send 
+					  // if there isn't a dead letter strategy 
+					   sent=true;
 					}
 				}
 			}
 			if(sent==false){
-				LOG.warn("Failed to send "+node+" to dead letter queue");
+				LOG.warn("Failed to send "+node+" to DLQ");
 			}
 		}catch(Exception e){
-			LOG.warn("Failed to pass expired message to dead letter queue",e);
+			LOG.warn("Caught an exception sending to DLQ: "+node,e);
 		}
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Tue Jan 22 06:28:10 2008
@@ -41,7 +41,7 @@
 
     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-        return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
+        return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
 
             public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jan 22 06:28:10 2008
@@ -184,8 +184,8 @@
             }
 
             // Recover the durable subscription.
-            String clientId = subscription.getClientId();
-            String subscriptionName = subscription.getSubscriptionName();
+            String clientId = subscription.getSubscriptionKey().getClientId();
+            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
             String selector = subscription.getConsumerInfo().getSelector();
             SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
             if (info != null) {
@@ -435,7 +435,8 @@
     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
         if (topicStore != null && node.isPersistent()) {
             DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
-            topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
+            SubscriptionKey key = dsub.getSubscriptionKey();
+            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Tue Jan 22 06:28:10 2008
@@ -68,11 +68,10 @@
         super(broker, context, info);
         this.usageManager = usageManager;
         String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
-        Store tempDataStore = broker.getTempDataStore();
-        if (tempDataStore != null) {
-            this.matched = new FilePendingMessageCursor(matchedName, tempDataStore);
-        } else {
+        if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
             this.matched = new VMPendingMessageCursor();
+        } else {
+            this.matched = new FilePendingMessageCursor(broker,matchedName);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Jan 22 06:28:10 2008
@@ -21,6 +21,9 @@
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.QueueMessageReference;
@@ -32,6 +35,8 @@
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
@@ -40,14 +45,14 @@
  * @version $Revision$
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
-
+    private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
     private static final AtomicLong NAME_COUNT = new AtomicLong();
-    
+    protected Broker broker;
     private Store store;
     private String name;
     private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
     private ListContainer<MessageReference> diskList;
-    private Iterator iter;
+    private Iterator<MessageReference> iter;
     private Destination regionDestination;
     private boolean iterating;
     private boolean flushRequired;
@@ -58,9 +63,10 @@
      * @param name
      * @param store
      */
-    public FilePendingMessageCursor(String name, Store store) {
+    public FilePendingMessageCursor(Broker broker,String name) {
+        this.broker = broker;
+        this.store= broker.getTempDataStore();
         this.name = NAME_COUNT.incrementAndGet() + "_" + name;
-        this.store = store;
     }
 
     public void start() throws Exception {
@@ -157,19 +163,39 @@
      * @param node
      */
     public synchronized void addMessageLast(MessageReference node) {
-        try {
-            regionDestination = node.getMessage().getRegionDestination();
-            if (isSpaceInMemoryList()) {
-                memoryList.add(node);
-                node.incrementReferenceCount();
-            } else {
-                flushToDisk();
-                node.decrementReferenceCount();
+        if (!node.isExpired()) {
+            try {
+                regionDestination = node.getMessage().getRegionDestination();
+                if (isDiskListEmpty()) {
+                    if (hasSpace()) {
+                        memoryList.add(node);
+                        node.incrementReferenceCount();
+                        return;
+                    }
+                }
+                if (!hasSpace()) {
+                    if (isDiskListEmpty()) {
+                        expireOldMessages();
+                        if (hasSpace()) {
+                            memoryList.add(node);
+                            node.incrementReferenceCount();
+                            return;
+                        } else {
+                            flushToDisk();
+                        }
+                    }
+                }
                 systemUsage.getTempUsage().waitForSpace();
-                getDiskList().addLast(node);
+                node.decrementReferenceCount();
+                getDiskList().add(node);
+
+            } catch (Exception e) {
+                LOG.error("Caught an Exception adding a message: " + node
+                        + " first to FilePendingMessageCursor ", e);
+                throw new RuntimeException(e);
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        } else {
+            discard(node);
         }
     }
 
@@ -179,19 +205,39 @@
      * @param node
      */
     public synchronized void addMessageFirst(MessageReference node) {
-        try {
-            regionDestination = node.getMessage().getRegionDestination();
-            if (isSpaceInMemoryList()) {
-                memoryList.addFirst(node);
-                node.incrementReferenceCount();
-            } else {
-                flushToDisk();
+        if (!node.isExpired()) {
+            try {
+                regionDestination = node.getMessage().getRegionDestination();
+                if (isDiskListEmpty()) {
+                    if (hasSpace()) {
+                        memoryList.addFirst(node);
+                        node.incrementReferenceCount();
+                        return;
+                    }
+                }
+                if (!hasSpace()) {
+                    if (isDiskListEmpty()) {
+                        expireOldMessages();
+                        if (hasSpace()) {
+                            memoryList.addFirst(node);
+                            node.incrementReferenceCount();
+                            return;
+                        } else {
+                            flushToDisk();
+                        }
+                    }
+                }
                 systemUsage.getTempUsage().waitForSpace();
                 node.decrementReferenceCount();
                 getDiskList().addFirst(node);
+
+            } catch (Exception e) {
+                LOG.error("Caught an Exception adding a message: " + node
+                        + " first to FilePendingMessageCursor ", e);
+                throw new RuntimeException(e);
             }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        } else {
+            discard(node);
         }
     }
 
@@ -271,13 +317,17 @@
         super.setSystemUsage(usageManager);
     }
 
-    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage,
+            int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
                 flushRequired = true;
                 if (!iterating) {
-                    flushToDisk();
-                    flushRequired = false;
+                    expireOldMessages();
+                    if (!hasSpace()) {
+                        flushToDisk();
+                        flushRequired = false;
+                    }
                 }
             }
         }
@@ -290,8 +340,25 @@
     protected boolean isSpaceInMemoryList() {
         return hasSpace() && isDiskListEmpty();
     }
+    
+    protected synchronized void expireOldMessages() {
+        if (!memoryList.isEmpty()) {
+            LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
+            this.memoryList = new LinkedList<MessageReference>();
+            while (!tmpList.isEmpty()) {
+                MessageReference node = tmpList.removeFirst();
+                if (node.isExpired()) {
+                    discard(node);
+                }else {
+                    memoryList.add(node);
+                }               
+            }
+        }
+
+    }
 
     protected synchronized void flushToDisk() {
+       
         if (!memoryList.isEmpty()) {
             while (!memoryList.isEmpty()) {
                 MessageReference node = memoryList.removeFirst();
@@ -312,10 +379,18 @@
                 diskList = store.getListContainer(name, "TopicSubscription", true);
                 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
             } catch (IOException e) {
-                e.printStackTrace();
+                LOG.error("Caught an IO Exception getting the DiskList ",e);
                 throw new RuntimeException(e);
             }
         }
         return diskList;
+    }
+    
+    protected void discard(MessageReference message) {
+        message.decrementReferenceCount();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Discarding message " + message);
+        }
+        broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Tue Jan 22 06:28:10 2008
@@ -94,7 +94,7 @@
     }
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
-        if (cacheEnabled && !isFull()) {
+        if (cacheEnabled && hasSpace()) {
             //optimization - A persistent queue will add the message to
             //to store then retrieve it again from the store.
             recoverMessage(node.getMessage());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Jan 22 06:28:10 2008
@@ -23,13 +23,13 @@
 import java.util.Map;
 
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,16 +53,19 @@
     private final Subscription subscription;
 
     /**
+     * @param broker 
      * @param topic
      * @param clientId
      * @param subscriberName
+     * @param maxBatchSize 
+     * @param subscription 
      * @throws IOException
      */
-    public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) {
+    public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
         this.clientId = clientId;
         this.subscriberName = subscriberName;
         this.subscription = subscription;
-        this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
+        this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
         storePrefetches.add(nonPersistent);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Tue Jan 22 06:28:10 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.cursors;
 
 import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
@@ -33,9 +34,9 @@
 public class StoreQueueCursor extends AbstractPendingMessageCursor {
 
     private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
+    private Broker broker;
     private int pendingCount;
     private Queue queue;
-    private Store tmpStore;
     private PendingMessageCursor nonPersistent;
     private QueueStorePrefetch persistent;
     private boolean started;
@@ -47,9 +48,9 @@
      * @param queue
      * @param tmpStore
      */
-    public StoreQueueCursor(Queue queue, Store tmpStore) {
+    public StoreQueueCursor(Broker broker,Queue queue) {
+        this.broker=broker;
         this.queue = queue;
-        this.tmpStore = tmpStore;
         this.persistent = new QueueStorePrefetch(queue);
         currentCursor = persistent;
     }
@@ -58,7 +59,7 @@
         started = true;
         super.start();
         if (nonPersistent == null) {
-            nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
+            nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
             nonPersistent.setSystemUsage(systemUsage);
             nonPersistent.setEnableAudit(isEnableAudit());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a PendIngMessageCursor for Durable subscribers *
@@ -33,14 +33,15 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param broker 
      * 
      * @param clientId
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
+     * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
-        return new FilePendingMessageCursor(name, tmpStorage);
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
+        return new FilePendingMessageCursor(broker,name);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a FilePendingMessageCursor *
@@ -32,14 +32,14 @@
 public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
 
     /**
+     * @param broker 
      * @param queue
-     * @param tmpStore
      * @return the cursor
      * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
      *      org.apache.activemq.kaha.Store)
      */
-    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
-        return new FilePendingMessageCursor("PendingCursor:" + queue.getName(), tmpStore);
+    public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
+        return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a PendIngMessageCursor for Durable subscribers *
@@ -31,15 +31,14 @@
 public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
 
     /**
+     * @param broker 
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
      * @return a Cursor
      * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
      *      org.apache.activemq.kaha.Store, int)
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
-                                                                  int maxBatchSize) {
-        return new FilePendingMessageCursor("PendingCursor:" + name, tmpStorage);
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
+        return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Abstraction to allow different policies for holding messages awaiting
@@ -30,12 +30,13 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param broker 
      * 
      * @param clientId
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
+     * @param sub 
      * @return the Pending Message cursor
      */
-    PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub);
+    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Abstraction to allow different policies for holding messages awaiting
@@ -30,10 +30,10 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param broker 
      * 
      * @param queue
-     * @param tmpStore
      * @return the cursor
      */
-    PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
+    PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Abstraction to allow different policies for holding messages awaiting
@@ -29,11 +29,11 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param broker 
      * 
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, int maxBatchSize);
+    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Tue Jan 22 06:28:10 2008
@@ -25,7 +25,6 @@
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,7 +57,7 @@
     private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
    
-    public void configure(Queue queue, Store tmpStore) {
+    public void configure(Broker broker,Queue queue) {
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
@@ -70,7 +69,7 @@
             queue.getMemoryUsage().setLimit(memoryLimit);
         }
         if (pendingQueuePolicy != null) {
-            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
+            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
             queue.setMessages(messages);
         }
         queue.setProducerFlowControl(isProducerFlowControl());
@@ -121,16 +120,16 @@
         if (pendingSubscriberPolicy != null) {
             String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
-            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name, broker.getTempDataStore(), maxBatchSize));
+            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
         }
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
-        String clientId = sub.getClientId();
-        String subName = sub.getSubscriptionName();
+        String clientId = sub.getSubscriptionKey().getClientId();
+        String subName = sub.getSubscriptionKey().getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
         if (pendingDurableSubscriberPolicy != null) {
-            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub);
+            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -34,14 +35,15 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
+     * @param broker 
      * 
      * @param clientId
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
+     * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
-        return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub);
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
+        return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a StoreQueueCursor *
@@ -32,14 +32,14 @@
 public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
 
     /**
+     * @param broker 
      * @param queue
-     * @param tmpStore
      * @return the cursor
      * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
      *      org.apache.activemq.kaha.Store)
      */
-    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
-        return new StoreQueueCursor(queue, tmpStore);
+    public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
+        return new StoreQueueCursor(broker,queue);
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a VMPendingMessageCursor *
@@ -32,14 +32,14 @@
 
     /**
      * Retrieve the configured pending message storage cursor;
-     * 
+     * @param broker 
      * @param clientId
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
+     * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
         return new VMPendingMessageCursor();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a VMPendingMessageCursor *
@@ -32,11 +32,11 @@
 public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
 
     /**
+     * @param broker 
      * @param queue
-     * @param tmpStore
      * @return the cursor
      */
-    public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
+    public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
         return new VMPendingMessageCursor();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
 
 /**
  * Creates a VMPendingMessageCursor *
@@ -31,15 +31,14 @@
 public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
 
     /**
+     * @param broker
      * @param name
-     * @param tmpStorage
      * @param maxBatchSize
      * @return a Cursor
      * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
      *      org.apache.activemq.kaha.Store, int)
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
-                                                                  int maxBatchSize) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
         return new VMPendingMessageCursor();
     }
 }