You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/09/12 12:07:35 UTC

svn commit: r442550 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/jdbc/ main/...

Author: rajdavies
Date: Tue Sep 12 03:07:34 2006
New Revision: 442550

URL: http://svn.apache.org/viewvc?view=rev&rev=442550
Log:
More foundation work to resolve: http://issues.apache.org/activemq/browse/AMQ-845

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Tue Sep 12 03:07:34 2006
@@ -155,4 +155,8 @@
     public int getPrefetchSize() {
         return info.getPrefetchSize();
     }
+    
+    public boolean isRecoveryRequired(){
+        return true;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Sep 12 03:07:34 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -40,7 +41,8 @@
     private boolean active=false;
     
     public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
-        //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+        //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
+       // super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
         super(broker,context,info);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
@@ -78,12 +80,14 @@
                     topic.activate(context, this);
                 }
             }
+            pending.start();
             dispatchMatched();
         }
     }
 
     synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {        
         active=false;
+        pending.stop();
         if( !keepDurableSubsActive ) {
             for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
                 Topic topic = (Topic) iter.next();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Sep 12 03:07:34 2006
@@ -188,7 +188,7 @@
                         if(context.isInTransaction()) {
                             // extend prefetch window only if not a pulling consumer
                             if (getPrefetchSize() != 0) {
-                                prefetchExtension=Math.max(prefetchExtension,index+1);
+                            prefetchExtension=Math.max(prefetchExtension,index+1);
                             }
                         }
                         else {
@@ -316,6 +316,10 @@
         return enqueueCounter;
     }
     
+    public boolean isRecoveryRequired(){
+        return pending.isRecoveryRequired();
+    }
+    
     /**
      * optimize message consumer prefetch if the consumer supports it
      *
@@ -336,7 +340,16 @@
         */
     }
     
-    
+    public void add(ConnectionContext context, Destination destination) throws Exception {
+        super.add(context,destination);
+        pending.add(context,destination);
+    }
+
+    public void remove(ConnectionContext context, Destination destination) throws Exception {
+        super.remove(context,destination);
+        pending.remove(context,destination);
+       
+    }
 
 
     protected void dispatchMatched() throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Tue Sep 12 03:07:34 2006
@@ -190,5 +190,13 @@
      * @return the prefetch size that is configured for the subscription
      */
     int getPrefetchSize();
+    
+    /**
+     * Informs the Broker if the subscription needs to intervention to recover it's state
+     * e.g. DurableTopicSubscriber may do
+     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @return true if recovery required
+     */
+    public boolean isRecoveryRequired();
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Sep 12 03:07:34 2006
@@ -180,31 +180,30 @@
     
             final MessageEvaluationContext msgContext = new MessageEvaluationContext();
             msgContext.setDestination(destination);
-            store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
-                public void recoverMessage(Message message) throws Exception {
-                    message.setRegionDestination(Topic.this);
-                    try {
-                        msgContext.setMessageReference(message);
-                        if (subscription.matches(message, msgContext)) {
-                            subscription.add(message);
+            if(subscription.isRecoveryRequired()){
+                store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
+                    public void recoverMessage(Message message) throws Exception{
+                        message.setRegionDestination(Topic.this);
+                        try{
+                            msgContext.setMessageReference(message);
+                            if(subscription.matches(message,msgContext)){
+                                subscription.add(message);
+                            }
+                        }catch(InterruptedException e){
+                            Thread.currentThread().interrupt();
+                        }catch(IOException e){
+                            // TODO: Need to handle this better.
+                            e.printStackTrace();
                         }
                     }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
+
+                    public void recoverMessageReference(String messageReference) throws Exception{
+                        throw new RuntimeException("Should not be called.");
                     }
-                    catch (IOException e) {
-                        // TODO: Need to handle this better.
-                        e.printStackTrace();
-                    }
-                }
-    
-                public void recoverMessageReference(String messageReference) throws Exception {
-                    throw new RuntimeException("Should not be called.");
-                }
-                
-                public void finished(){
-                }
-            });
+
+                    public void finished(){}
+                });
+            }
             
             if( true && subscription.getConsumerInfo().isRetroactive() ) {
                 // If nothing was in the persistent store, then try to use the recovery policy.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Sep 12 03:07:34 2006
@@ -24,6 +24,7 @@
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -216,6 +217,9 @@
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
         if (info.isDurable()) {
+            if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
+                throw new JMSException("Cannot create a durable subscription for an advisory Topic");
+            }
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             if (sub == null) {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,43 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+
+
+/**
+ * Default method holder for pending message (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+public abstract class AbstractPendingMessageCursor implements  PendingMessageCursor{
+    
+    public void start() throws Exception{
+    }
+    
+    public void stop() throws Exception{
+    }
+    
+    public void add(ConnectionContext context, Destination destination) throws Exception{
+    }
+
+    public void remove(ConnectionContext context, Destination destination) throws Exception{
+    }
+    
+    
+    public boolean isRecoveryRequired(){
+        return true;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -12,12 +12,14 @@
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.region.cursors;
+
 import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.*;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.Store;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.kahadaptor.CommandMarshaller;
 /**
@@ -25,25 +27,26 @@
  * 
  * @version $Revision$
  */
-public class FilePendingMessageCursor implements PendingMessageCursor{
+public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
     private ListContainer list;
-    private Iterator iter = null;
+    private Iterator iter=null;
     private Destination regionDestination;
-    
+
     /**
      * @param name
      * @param store
      * @throws IOException
      */
-    public FilePendingMessageCursor(String name, Store store) {
+    public FilePendingMessageCursor(String name,Store store){
         try{
-            list = store.getListContainer(name);
+            list=store.getListContainer(name);
             list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
             list.setMaximumCacheSize(0);
         }catch(IOException e){
             throw new RuntimeException(e);
         }
     }
+
     /**
      * @return true if there are no pending messages
      */
@@ -53,12 +56,12 @@
 
     /**
      * reset the cursor
-     *
+     * 
      */
     public void reset(){
-        iter = list.listIterator();
+        iter=list.listIterator();
     }
-    
+
     /**
      * add message to await dispatch
      * 
@@ -66,42 +69,42 @@
      */
     public void addMessageLast(MessageReference node){
         try{
-            regionDestination = node.getMessage().getRegionDestination();
+            regionDestination=node.getMessage().getRegionDestination();
             node.decrementReferenceCount();
         }catch(IOException e){
-           throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
         list.addLast(node);
     }
-    
+
     /**
      * add message to await dispatch
-     * @param position 
+     * 
+     * @param position
      * @param node
      */
     public void addMessageFirst(MessageReference node){
         try{
-            regionDestination = node.getMessage().getRegionDestination();
+            regionDestination=node.getMessage().getRegionDestination();
             node.decrementReferenceCount();
         }catch(IOException e){
-           throw new RuntimeException(e);
+            throw new RuntimeException(e);
         }
         list.addFirst(node);
     }
 
-
     /**
      * @return true if there pending messages to dispatch
      */
     public boolean hasNext(){
-       return iter.hasNext();
+        return iter.hasNext();
     }
 
     /**
      * @return the next pending message
      */
     public MessageReference next(){
-        Message message = (Message) iter.next();
+        Message message=(Message) iter.next();
         message.setRegionDestination(regionDestination);
         message.incrementReferenceCount();
         return message;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -13,6 +13,9 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 
 /**
@@ -20,7 +23,24 @@
  * 
  * @version $Revision$
  */
-public interface PendingMessageCursor{
+public interface PendingMessageCursor extends Service{
+    
+    
+    /**
+     * Add a destination
+     * @param context
+     * @param destination
+     * @throws Exception
+     */
+    public void add(ConnectionContext context, Destination destination) throws Exception;
+
+    /**
+     * remove a destination
+     * @param context
+     * @param destination
+     * @throws Exception
+     */
+    public void remove(ConnectionContext context, Destination destination) throws Exception;
     /**
      * @return true if there are no pending messages
      */
@@ -70,4 +90,12 @@
      * 
      */
     public void clear();
+    
+    /**
+     * Informs the Broker if the subscription needs to intervention to recover it's state
+     * e.g. DurableTopicSubscriber may do
+     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @return true if recovery required
+     */
+    public boolean isRecoveryRequired();
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,180 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
+    static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
+    private int pendingCount=0;
+    private String clientId;
+    private String subscriberName;
+    private int maxBatchSize=10;
+    private LinkedList batchList=new LinkedList();
+    private Map topics=new HashMap();
+    private LinkedList storePrefetches=new LinkedList();
+    private AtomicBoolean started=new AtomicBoolean();
+
+    /**
+     * @param topic
+     * @param clientId
+     * @param subscriberName
+     * @throws IOException
+     */
+    public StoreDurableSubscriberCursor(String clientId,String subscriberName){
+        this.clientId=clientId;
+        this.subscriberName=subscriberName;
+    }
+
+    public synchronized void start() throws Exception{
+        started.set(true);
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+            tsp.start();
+            pendingCount+=tsp.size();
+        }
+    }
+
+    public synchronized void stop() throws Exception{
+        started.set(false);
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+            tsp.stop();
+        }
+        pendingCount=0;
+    }
+
+    /**
+     * Add a destination
+     * 
+     * @param context
+     * @param destination
+     * @throws Exception
+     */
+    public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
+        TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
+        topics.put(destination,tsp);
+        storePrefetches.add(tsp);
+        if(started.get()){
+            tsp.start();
+            pendingCount+=tsp.size();
+        }
+    }
+
+    /**
+     * remove a destination
+     * 
+     * @param context
+     * @param destination
+     * @throws Exception
+     */
+    public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
+        TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
+        if(tsp!=null){
+            storePrefetches.remove(tsp);
+        }
+    }
+
+    /**
+     * @return true if there are no pending messages
+     */
+    public synchronized boolean isEmpty(){
+        return pendingCount<=0;
+    }
+
+    /**
+     * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+     * may do
+     * 
+     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @return true if recovery required
+     */
+    public boolean isRecoveryRequired(){
+        return false;
+    }
+
+    public synchronized void addMessageFirst(MessageReference node){
+        pendingCount++;
+    }
+
+    public synchronized void addMessageLast(MessageReference node){
+        pendingCount++;
+    }
+
+    public void clear(){
+        pendingCount=0;
+    }
+
+    public synchronized boolean hasNext(){
+        return !isEmpty();
+    }
+
+    public synchronized MessageReference next(){
+        MessageReference result=null;
+        if(!isEmpty()){
+            if(batchList.isEmpty()){
+                try{
+                    fillBatch();
+                }catch(Exception e){
+                    log.error("Couldn't fill batch from store ",e);
+                    throw new RuntimeException(e);
+                }
+            }
+            if(!batchList.isEmpty()){
+                result=(MessageReference) batchList.removeFirst();
+            }
+        }
+        return result;
+    }
+
+    public synchronized void remove(){
+        pendingCount--;
+    }
+
+    public void reset(){
+        batchList.clear();
+    }
+
+    public int size(){
+        return pendingCount;
+    }
+
+    private synchronized void fillBatch() throws Exception{
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+            tsp.fillBatch();
+            if(batchList.size()>=maxBatchSize){
+                break;
+            }
+        }
+        // round-robin
+        Object obj=storePrefetches.removeFirst();
+        storePrefetches.addLast(obj);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,156 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * 
+ * @version $Revision$
+ */
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
+    static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
+    private Topic topic;
+    private TopicMessageStore store;
+    private LinkedList batchList;
+    private String clientId;
+    private String subscriberName;
+    private int pendingCount=0;
+    private MessageId lastMessageId;
+    private int maxBatchSize=10;
+
+    /**
+     * @param topic
+     * @param batchList
+     * @param clientId
+     * @param subscriberName
+     * @throws IOException
+     */
+    public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
+        this.topic=topic;
+        this.store=(TopicMessageStore) topic.getMessageStore();
+        this.batchList=batchList;
+        this.clientId=clientId;
+        this.subscriberName=subscriberName;
+    }
+
+    public void start() throws Exception{
+        pendingCount=store.getMessageCount(clientId,subscriberName);
+        System.err.println("Pending count = "+pendingCount);
+    }
+
+    public void stop() throws Exception{
+        pendingCount=0;
+        lastMessageId=null;
+    }
+
+    /**
+     * @return true if there are no pending messages
+     */
+    public boolean isEmpty(){
+        return pendingCount==0;
+    }
+
+    /**
+     * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+     * may do
+     * 
+     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @return true if recovery required
+     */
+    public boolean isRecoveryRequired(){
+        return false;
+    }
+
+    public synchronized void addMessageFirst(MessageReference node){
+        pendingCount++;
+    }
+
+    public synchronized void addMessageLast(MessageReference node){
+        pendingCount++;
+    }
+
+    public void clear(){
+        pendingCount=0;
+        lastMessageId=null;
+    }
+
+    public synchronized boolean hasNext(){
+        return !isEmpty();
+    }
+
+    public synchronized MessageReference next(){
+        MessageReference result=null;
+        if(!isEmpty()){
+            if(batchList.isEmpty()){
+                try{
+                    fillBatch();
+                }catch(Exception e){
+                    log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
+                    throw new RuntimeException(e);
+                }
+            }
+            result=(MessageReference) batchList.removeFirst();
+        }
+        return result;
+    }
+
+    public synchronized void remove(){
+        pendingCount--;
+    }
+
+    public void reset(){
+        batchList.clear();
+    }
+
+    public int size(){
+        return pendingCount;
+    }
+
+    // MessageRecoveryListener implementation
+    public void finished(){}
+
+    public void recoverMessage(Message message) throws Exception{
+        batchList.addLast(message);
+    }
+
+    public void recoverMessageReference(String messageReference) throws Exception{
+        // shouldn't get called
+        throw new RuntimeException("Not supported");
+    }
+
+    // implementation
+    protected void fillBatch() throws Exception{
+        if(pendingCount<=0){
+            pendingCount=store.getMessageCount(clientId,subscriberName);
+        }
+        if(pendingCount>0){
+            store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
+            // this will add more messages to the batch list
+            if(!batchList.isEmpty()){
+                Message message=(Message) batchList.getLast();
+                lastMessageId=message.getMessageId();
+            }
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -20,7 +20,7 @@
  * 
  * @version $Revision$
  */
-public class VMPendingMessageCursor implements PendingMessageCursor{
+public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
     private LinkedList list = new LinkedList();
     private Iterator iter = null;
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -82,6 +82,14 @@
         delegate.recoverSubscription(clientId, subscriptionName, listener);
     }
     
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+        delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+    }
+    
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        return delegate.getNextMessageToDeliver(clientId,subscriptionName);
+    }
+    
     public ActiveMQDestination getDestination() {
         return delegate.getDestination();
     }
@@ -100,4 +108,8 @@
     public void setUsageManager(UsageManager usageManager) {
         delegate.setUsageManager(usageManager);
     }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        return delegate.getMessageCount(clientId,subscriberName);
+    }    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -1,95 +1,134 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
-
 /**
  * A MessageStore for durable topic subscriptions
- *
+ * 
  * @version $Revision: 1.4 $
  */
-public interface TopicMessageStore extends MessageStore {
-
+public interface TopicMessageStore extends MessageStore{
     /**
-     * Stores the last acknowledged messgeID for the given subscription
-     * so that we can recover and commence dispatching messages from the last
-     * checkpoint
-     * @param context TODO
+     * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
+     * messages from the last checkpoint
+     * 
+     * @param context
+     * @param clientId
+     * @param subscriptionName
      * @param messageId
      * @param subscriptionPersistentId
+     * @throws IOException
      */
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+                    throws IOException;
 
     /**
+     * @param clientId
+     * @param subscriptionName
      * @param sub
-     * @throws JMSException 
+     * @throws IOException
+     * @throws JMSException
      */
-    public void deleteSubscription(String clientId, String subscriptionName) throws IOException;
-    
+    public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
+
     /**
-     * For the new subscription find the last acknowledged message ID
-     * and then find any new messages since then and dispatch them
-     * to the subscription.
-     * <p/>
-     * e.g. if we dispatched some messages to a new durable topic subscriber, then went down before
-     * acknowledging any messages, we need to know the correct point from which to recover from.
+     * For the new subscription find the last acknowledged message ID and then find any new messages since then and
+     * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
+     * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param listener
      * @param subscription
-     *
-     * @throws Exception 
+     * 
+     * @throws Exception
      */
-    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+                    throws Exception;
 
     /**
+     * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
+     * messageId <p/>
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param lastMessageId
+     * @param maxReturned
+     * @param listener
+     * 
+     * @throws Exception
+     */
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+                    MessageRecoveryListener listener) throws Exception;
+
+    
+    /**
+     * Get the next un-acknowledged message to deliver to a subscriber
+     * @param clientId
+     * @param subscriptionName
+     * @return the next message or null
+     * @throws IOException 
+     */
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
+    
+    
+    /**
+     * Get the number of messages ready to deliver from the store to a durable subscriber
+     * @param clientId
+     * @param subscriberName
+     * @return the outstanding message count
+     * @throws IOException
+     */
+    public int getMessageCount(String clientId,String subscriberName) throws IOException;
+    
+    /**
      * Finds the subscriber entry for the given consumer info
      * 
-     * @param clientId TODO
-     * @param subscriptionName TODO
-     * @return
+     * @param clientId
+     * @param subscriptionName
+     * @return the SubscriptionInfo
+     * @throws IOException
      */
-    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
 
     /**
      * Lists all the durable subscirptions for a given destination.
      * 
-     * @param clientId TODO
-     * @param subscriptionName TODO
-     * @return
+     * @return an array SubscriptionInfos
+     * @throws IOException
      */
     public SubscriptionInfo[] getAllSubscriptions() throws IOException;
 
     /**
-     * Inserts the subscriber info due to a subscription change
-     * <p/>
-     * If this is a new subscription and the retroactive is false, then the last
-     * message sent to the topic should be set as the last message acknowledged by they new
-     * subscription.  Otherwise, if retroactive is true, then create the subscription without 
-     * it having an acknowledged message so that on recovery, all message recorded for the 
-     * topic get replayed.
-     * @param retroactive TODO
-     *
+     * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
+     * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
+     * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
+     * message so that on recovery, all message recorded for the topic get replayed.
+     * 
+     * @param clientId
+     * @param subscriptionName
+     * @param selector
+     * @param retroactive
+     * @throws IOException
+     * 
      */
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException;
-
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+                    throws IOException;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Tue Sep 12 03:07:34 2006
@@ -53,6 +53,9 @@
 
     public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
+    
+    public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
+                    String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
 
     public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
@@ -79,5 +82,8 @@
 
     public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
 
+    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
+    
+    public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -89,6 +89,33 @@
         }
     }
 
+    public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
+            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
+                    new JDBCMessageRecoveryListener() {
+                        public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+                            Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                            msg.getMessageId().setBrokerSequenceId(sequenceId);
+                            listener.recoverMessage(msg);
+                        }
+                        public void recoverMessageReference(String reference) throws Exception {
+                            listener.recoverMessageReference(reference);
+                        }
+                        
+                        public void finished(){
+                            listener.finished();
+                        }
+                    });
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        
+    }
     /**
      * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
      *      boolean)
@@ -147,5 +174,41 @@
             c.close();
         }
     }
+
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        Message result = null;
+    
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
+            result = (Message) wireFormat.unmarshal(new ByteSequence(data));
+               
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        int result = 0;
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
+               
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
+
+    
+
+    
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Tue Sep 12 03:07:34 2006
@@ -53,6 +53,7 @@
     private String updateLastAckOfDurableSubStatement;
     private String deleteSubscriptionStatement;
     private String findAllDurableSubMessagesStatement;
+    private String findDurableSubMessagesStatement;
     private String findAllDestinationsStatement;
     private String removeAllMessagesStatement;
     private String removeAllSubscriptionsStatement;
@@ -61,6 +62,8 @@
     private String[] dropSchemaStatements;
     private String lockCreateStatement;
     private String lockUpdateStatement;
+    private String nextDurableSubscriberMessageStatement;
+    private String durableSubscriberMessageCountStatement;
     private boolean useLockCreateWhereClause;
 
     public String[] getCreateSchemaStatements() {
@@ -204,6 +207,47 @@
         }
         return findAllDurableSubMessagesStatement;
     }
+    
+    public String getFindDurableSubMessagesStatement(){
+        if(findDurableSubMessagesStatement==null){
+            findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+                            +getFullAckTableName()+" D "+" WHERE ? >= ( select count(*) from "
+                            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                            +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?"+" ORDER BY M.ID)";
+        }
+        return findDurableSubMessagesStatement;
+    }
+    
+    public String findAllDurableSubMessagesStatement() {
+        if (findAllDurableSubMessagesStatement == null) {
+            findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+                    + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
+        }
+        return findAllDurableSubMessagesStatement;
+    }
+    
+    public String getNextDurableSubscriberMessageStatement(){
+        if (nextDurableSubscriberMessageStatement == null){
+            nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+            +getFullAckTableName()+" D "+" WHERE 1 >= ( select count(*) from "
+            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+" ORDER BY M.ID)"; 
+        }
+        return nextDurableSubscriberMessageStatement;
+    }
+    
+    /**
+     * @return the durableSubscriberMessageCountStatement
+     */
+    public String getDurableSubscriberMessageCountStatement(){
+        if (durableSubscriberMessageCountStatement==null){
+            durableSubscriberMessageCountStatement = "select count(*) from "
+            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+        }
+        return durableSubscriberMessageCountStatement;
+    }
 
     public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
@@ -498,5 +542,27 @@
 
     public void setLockUpdateStatement(String lockUpdateStatement) {
         this.lockUpdateStatement = lockUpdateStatement;
+    }
+
+    /**
+     * @param findDurableSubMessagesStatement the findDurableSubMessagesStatement to set
+     */
+    public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){
+        this.findDurableSubMessagesStatement=findDurableSubMessagesStatement;
+    }
+
+    /**
+     * @param nextDurableSubscriberMessageStatement the nextDurableSubscriberMessageStatement to set
+     */
+    public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){
+        this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement;
+    }
+
+
+    /**
+     * @param durableSubscriberMessageCountStatement the durableSubscriberMessageCountStatement to set
+     */
+    public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
+        this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Tue Sep 12 03:07:34 2006
@@ -408,6 +408,58 @@
         }
         
     }
+    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,JDBCMessageRecoveryListener listener) throws Exception {
+//      dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
+
+      PreparedStatement s = null;
+      ResultSet rs = null;
+      try {
+
+          s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+          s.setString(1, destination.getQualifiedName());
+          s.setString(2, clientId);
+          s.setString(3, subscriptionName);
+          s.setLong(4,seq);
+          s.setInt(5,maxReturned);
+          rs = s.executeQuery();
+
+          if( statements.isUseExternalMessageReferences() ) {
+              while (rs.next()) {
+                  listener.recoverMessageReference(rs.getString(2));
+              }
+          } else {
+              while (rs.next()) {
+                  listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
+              }
+          }
+
+      }
+      finally {
+          close(rs);
+          close(s);
+          listener.finished();
+      }
+      
+  }
+    
+    public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
+                    String subscriptionName) throws SQLException, IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        int result = 0;
+        try{
+            s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
+            rs=s.executeQuery();
+            result =  rs.getInt(1);
+        }finally{
+            close(rs);
+            close(s);
+        }
+        return result;
+    }
 
     /**
      * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
@@ -607,6 +659,29 @@
 
     public void setStatements(Statements statements) {
         this.statements = statements;
+    }
+
+    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException,IOException{
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+
+            s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+            s.setString(1, destination.getQualifiedName());
+            s.setString(2, clientId);
+            s.setString(3, subscriberName);
+            rs = s.executeQuery();
+
+            if (!rs.next()) {
+                return null;
+            }
+            return getBinaryData(rs, 1);
+
+        }
+        finally {
+            close(rs);
+            close(s);
+        }
     }
 
     /*

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Tue Sep 12 03:07:34 2006
@@ -604,11 +604,14 @@
     }
 
     public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
-        if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
-            checkpoint(false, true);
+        newPercentUsage = ((newPercentUsage)/10)*10;
+        oldPercentUsage = ((oldPercentUsage)/10)*10;
+        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+            boolean sync = newPercentUsage >= 90;
+            checkpoint(sync, true);
         }
     }
-
+    
     public JournalTransactionStore getTransactionStore() {
         return transactionStore;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -57,6 +57,12 @@
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
     }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+        this.peristenceAdapter.checkpoint(true, true);
+        longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+        
+    }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return longTermStore.lookupSubscription(clientId, subscriptionName);
@@ -183,5 +189,17 @@
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
+
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+    }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageCount(clientId,subscriberName);
+    }
+
+    
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -71,6 +71,25 @@
         });
 
     }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
+        this.peristenceAdapter.checkpoint(true, true);
+        longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
+            public void recoverMessage(Message message) throws Exception {
+                throw new IOException("Should not get called.");
+            }
+            public void recoverMessageReference(String messageReference) throws Exception {
+                RecordLocation loc = toRecordLocation(messageReference);
+                Message message = (Message) peristenceAdapter.readCommand(loc);
+                listener.recoverMessage(message);
+            }
+            
+            public void finished(){
+                listener.finished();
+            }
+        });
+        
+    }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return longTermStore.lookupSubscription(clientId, subscriptionName);
@@ -197,5 +216,17 @@
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
+    
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+    }
+    
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageCount(clientId,subscriberName);
+    }
+
+   
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -148,6 +148,42 @@
             listener.finished();
         }
     }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+                    MessageRecoveryListener listener) throws Exception{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        if(list!=null){
+            boolean startFound=false;
+            int count = 0;
+            for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+                Object msg=messageContainer.get(i.next());
+                if(msg!=null){
+                    if(msg.getClass()==String.class){
+                        String ref=msg.toString();
+                        if (startFound || lastMessageId == null){
+                            listener.recoverMessageReference(ref);
+                            count++;
+                        }
+                        else if(startFound||ref.equals(lastMessageId.toString())){
+                            startFound=true;
+                        }
+                    }else{
+                        Message message=(Message) msg;
+                        if(startFound||message.getMessageId().equals(lastMessageId)){
+                            startFound=true;
+                        }else{
+                            listener.recoverMessage(message);
+                            count++;
+                        }
+                    }
+                }
+                listener.finished();
+            }
+        }else{
+            listener.finished();
+        }
+    }
 
     public void delete(){
         super.delete();
@@ -172,4 +208,20 @@
         container.setMarshaller(marshaller);
         subscriberAcks.put(key,container);
     }
+
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        Iterator iter = list.iterator();
+        return (Message) (iter.hasNext() ? iter.next() : null);
+        
+    }
+
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriberName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        return list.size();
+    }
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -111,6 +111,40 @@
             listener.finished();
         }
     }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+        boolean startFound=false;
+        // the message table is a synchronizedMap - so just have to synchronize here
+        synchronized(messageTable){
+            int count = 0;
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
+                Map.Entry entry=(Entry) iter.next();
+               
+                    Object msg=entry.getValue();
+                    if(msg.getClass()==String.class){
+                        String ref=msg.toString();
+                        if(startFound||ref.equals(lastMessageId.toString())){
+                            startFound=true;
+                        }else if (startFound){
+                            listener.recoverMessageReference(ref);
+                            count++;
+                        }
+                    }else{
+                        Message message=(Message) msg;
+                        if(startFound||message.getMessageId().equals(lastMessageId)){
+                            startFound=true;
+                        }else if (startFound){
+                            listener.recoverMessage(message);
+                            count++;
+                        }
+                    }
+                
+            }
+            listener.finished();
+        }
+        
+    }
 
     public void delete() {
         super.delete();
@@ -122,4 +156,34 @@
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+        // the message table is a synchronizedMap - so just have to synchronize here
+        synchronized(messageTable){
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+                Map.Entry entry=(Entry) iter.next();
+                if(entry.getKey().equals(lastAck)){
+                    return (Message) entry.getValue();
+                }
+            }
+        }
+        return null;
+    }
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        int result = 0;
+        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
+        // the message table is a synchronizedMap - so just have to synchronize here
+        synchronized(messageTable){
+            result = messageTable.size();
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+                Map.Entry entry=(Entry) iter.next();
+                if(entry.getKey().equals(lastAck)){
+                    break;
+                }
+                result--;
+            }
+        }
+        return result;
+    }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -94,6 +94,42 @@
         }
         
     }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+                    MessageRecoveryListener listener) throws Exception{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        if(list!=null){
+            boolean startFound=false;
+            int count = 0;
+            for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+                Object msg=messageContainer.get(i.next());
+                if(msg!=null){
+                    if(msg.getClass()==String.class){
+                        String ref=msg.toString();
+                        if (startFound || lastMessageId == null){
+                            listener.recoverMessageReference(ref);
+                            count++;
+                        }
+                        else if(startFound||ref.equals(lastMessageId.toString())){
+                            startFound=true;
+                        }
+                    }else{
+                        Message message=(Message) msg;
+                        if(startFound||message.getMessageId().equals(lastMessageId)){
+                            startFound=true;
+                        }else{
+                            listener.recoverMessage(message);
+                            count++;
+                        }
+                    }
+                }
+                listener.finished();
+            }
+        }else{
+            listener.finished();
+        }
+    }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
         return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
@@ -290,6 +326,20 @@
         Marshaller marshaller=new StringMarshaller();
         container.setMarshaller(marshaller);
         subscriberAcks.put(key,container);
+    }
+    
+    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        Iterator iter = list.iterator();
+        return (Message) (iter.hasNext() ? iter.next() : null);
+        
+    }
+    
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriberName);
+        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        return list.size();
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties Tue Sep 12 03:07:34 2006
@@ -1,26 +1,9 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-## 
-## http://www.apache.org/licenses/LICENSE-2.0
-## 
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-
 #
 # The logging properties used for eclipse testing, We want to see debug output on the console.
 #
 log4j.rootLogger=WARN, out
 
-log4j.logger.org.apache.activemq=DEBUG
+log4j.logger.org.apache.activemq=INFO
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Tue Sep 12 03:07:34 2006
@@ -28,6 +28,7 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
 /**
  * @version $Revision$
@@ -52,7 +53,14 @@
         super.setUp();
         broker=new BrokerService();
         
-        broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+        //broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+        /*
+        DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
+        factory.setDataDirectoryFile(broker.getDataDirectory());
+        factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
+        factory.setUseJournal(false);
+        broker.setPersistenceFactory(factory);
+        */
         broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
         broker.start();
         connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);