You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 15:05:22 UTC

svn commit: r454368 [1/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/kaha/ main...

Author: rajdavies
Date: Mon Oct  9 06:05:20 2006
New Revision: 454368

URL: http://svn.apache.org/viewvc?view=rev&rev=454368
Log:
 changes for https://issues.apache.org/activemq/browse/AMQ-845 -
provide support for durable topic cursors

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Oct  9 06:05:20 2006
@@ -454,6 +454,7 @@
         if (broker != null) {
             stopper.stop(broker);
         }
+        tempDataStore.close();
 
         if (isUseJmx()) {
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@@ -957,7 +958,7 @@
     /**
      * @return the tempDataStore
      */
-    public Store getTempDataStore() {
+    public synchronized Store getTempDataStore() {
         if (tempDataStore == null){
             String name = getTmpDataDirectory().getPath();
             try {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Oct  9 06:05:20 2006
@@ -41,9 +41,9 @@
     private boolean active=false;
     
     public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
-        //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
-       // super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
-        super(broker,context,info);
+       //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
+       //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+       super(broker,context,info);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct  9 06:05:20 2006
@@ -124,7 +124,8 @@
         
     synchronized public void add(MessageReference node) throws Exception{
         enqueueCounter++;
-        if(!isFull()){
+        //if(!isFull()){
+        if(!isFull() && pending.isEmpty() && canDispatch(node)){
             dispatch(node);
         }else{
             optimizePrefetch();
@@ -196,8 +197,6 @@
                         }
                         dispatchMatched();
                         return;
-                    }else{
-                        // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
                     }
                 }
             }
@@ -435,8 +434,7 @@
     /**
      * @param node
      * @param message
-     *            TODO
-     * @return
+     * @return MessageDispatch
      */
     protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
         if( node == QueueMessageReference.NULL_MESSAGE ) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Oct  9 06:05:20 2006
@@ -1,43 +1,96 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
+
 package org.apache.activemq.broker.region.cursors;
 
+import java.io.IOException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
-
+import org.apache.activemq.broker.region.MessageReference;
 
 /**
- * Default method holder for pending message (messages awaiting disptach to a consumer) cursor
+ * Abstract method holder for pending message (messages awaiting disptach to a
+ * consumer) cursor
  * 
  * @version $Revision$
  */
-public abstract class AbstractPendingMessageCursor implements  PendingMessageCursor{
-    
+public class AbstractPendingMessageCursor implements PendingMessageCursor {
+    protected int maxBatchSize = 100;
+
     public void start() throws Exception{
     }
-    
+
     public void stop() throws Exception{
     }
-    
-    public void add(ConnectionContext context, Destination destination) throws Exception{
+
+    public void add(ConnectionContext context,Destination destination)
+            throws Exception{
     }
 
-    public void remove(ConnectionContext context, Destination destination) throws Exception{
+    public void remove(ConnectionContext context,Destination destination)
+            throws Exception{
     }
-    
-    
+
     public boolean isRecoveryRequired(){
         return true;
     }
+
+    public void addMessageFirst(MessageReference node) throws Exception{
+    }
+
+    public void addMessageLast(MessageReference node) throws Exception{
+    }
+
+    public void clear(){
+    }
+
+    public boolean hasNext(){
+        return false;
+    }
+
+    public boolean isEmpty(){
+        return false;
+    }
+
+    public MessageReference next(){
+        return null;
+    }
+
+    public void remove(){
+    }
+
+    public void reset(){
+    }
+
+    public int size(){
+        return 0;
+    }
+    
+    public int getMaxBatchSize(){
+        return maxBatchSize;
+    }
+
+    public void setMaxBatchSize(int maxBatchSize){
+        this.maxBatchSize=maxBatchSize;
+    }
+
+    protected void fillBatch() throws Exception{
+    }
+    
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Oct  9 06:05:20 2006
@@ -13,6 +13,8 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import java.io.IOException;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
@@ -55,14 +57,17 @@
     /**
      * add message to await dispatch
      * @param node
+     * @throws IOException 
+     * @throws Exception 
      */
-    public void addMessageLast(MessageReference node);
+    public void addMessageLast(MessageReference node) throws  Exception;
     
     /**
      * add message to await dispatch
      * @param node
+     * @throws Exception 
      */
-    public void addMessageFirst(MessageReference node);
+    public void addMessageFirst(MessageReference node) throws Exception;
 
     /**
      * @return true if there pending messages to dispatch
@@ -94,8 +99,18 @@
     /**
      * Informs the Broker if the subscription needs to intervention to recover it's state
      * e.g. DurableTopicSubscriber may do
-     * @see org.apache.activemq.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
     public boolean isRecoveryRequired();
+    
+    /**
+     * @return the maximum batch size
+     */
+    public int getMaxBatchSize();
+
+    /**
+     * Set the max batch size
+     * @param maxBatchSize
+     */
+    public void setMaxBatchSize(int maxBatchSize);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct  9 06:05:20 2006
@@ -11,6 +11,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
@@ -22,24 +23,28 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
  * 
  * @version $Revision$
  */
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
+
     static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
     private int pendingCount=0;
     private String clientId;
     private String subscriberName;
-    private int maxBatchSize=10;
-    private LinkedList batchList=new LinkedList();
     private Map topics=new HashMap();
     private LinkedList storePrefetches=new LinkedList();
-    private AtomicBoolean started=new AtomicBoolean();
+    private boolean started;
+    private PendingMessageCursor nonPersistent;
+    private PendingMessageCursor currentCursor;
 
     /**
      * @param topic
@@ -47,24 +52,26 @@
      * @param subscriberName
      * @throws IOException
      */
-    public StoreDurableSubscriberCursor(String clientId,String subscriberName){
+    public StoreDurableSubscriberCursor(String clientId,String subscriberName,Store store,int maxBatchSize){
         this.clientId=clientId;
         this.subscriberName=subscriberName;
+        this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store);
+        storePrefetches.add(nonPersistent);
     }
 
     public synchronized void start() throws Exception{
-        started.set(true);
+        started=true;
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
-            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
             tsp.start();
             pendingCount+=tsp.size();
         }
     }
 
     public synchronized void stop() throws Exception{
-        started.set(false);
+        started=false;
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
-            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
             tsp.stop();
         }
         pendingCount=0;
@@ -78,10 +85,11 @@
      * @throws Exception
      */
     public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
-        TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
+        TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
+        tsp.setMaxBatchSize(getMaxBatchSize());
         topics.put(destination,tsp);
         storePrefetches.add(tsp);
-        if(started.get()){
+        if(started){
             tsp.start();
             pendingCount+=tsp.size();
         }
@@ -95,7 +103,7 @@
      * @throws Exception
      */
     public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
-        TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
+        Object tsp=topics.remove(destination);
         if(tsp!=null){
             storePrefetches.remove(tsp);
         }
@@ -119,12 +127,32 @@
         return false;
     }
 
-    public synchronized void addMessageFirst(MessageReference node){
-        pendingCount++;
+    public synchronized void addMessageFirst(MessageReference node) throws IOException{
+        if(started){
+            throw new RuntimeException("This shouldn't be called!");
+        }
     }
 
-    public synchronized void addMessageLast(MessageReference node){
-        pendingCount++;
+    public synchronized void addMessageLast(MessageReference node) throws Exception{
+        if(started){
+            if(node!=null){
+                Message msg=node.getMessage();
+                if(!msg.isPersistent()){
+                    nonPersistent.addMessageLast(node);
+                }else{
+                    Destination dest=msg.getRegionDestination();
+                    TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
+                    if(tsp!=null){
+                        tsp.addMessageLast(node);
+                        // if the store has been empty - then this message is next to dispatch
+                        if((pendingCount-nonPersistent.size())<=0){
+                            tsp.nextToDispatch(node.getMessageId());
+                        }
+                    }
+                }
+                pendingCount++;
+            }
+        }
     }
 
     public void clear(){
@@ -132,49 +160,56 @@
     }
 
     public synchronized boolean hasNext(){
-        return !isEmpty();
-    }
-
-    public synchronized MessageReference next(){
-        MessageReference result=null;
-        if(!isEmpty()){
-            if(batchList.isEmpty()){
-                try{
-                    fillBatch();
-                }catch(Exception e){
-                    log.error("Couldn't fill batch from store ",e);
-                    throw new RuntimeException(e);
-                }
-            }
-            if(!batchList.isEmpty()){
-                result=(MessageReference) batchList.removeFirst();
+        boolean result=pendingCount>0;
+        if(result){
+            try{
+                currentCursor=getNextCursor();
+            }catch(Exception e){
+                log.error("Failed to get current cursor ",e);
+                throw new RuntimeException(e);
             }
+            result=currentCursor!=null?currentCursor.hasNext():false;
         }
         return result;
     }
 
+    public synchronized MessageReference next(){
+        return currentCursor!=null?currentCursor.next():null;
+    }
+
     public synchronized void remove(){
+        if(currentCursor!=null){
+            currentCursor.remove();
+        }
         pendingCount--;
     }
 
-    public void reset(){
-        batchList.clear();
+    public synchronized void reset(){
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+            tsp.reset();
+        }
     }
 
     public int size(){
         return pendingCount;
     }
 
-    private synchronized void fillBatch() throws Exception{
-        for(Iterator i=storePrefetches.iterator();i.hasNext();){
-            TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
-            tsp.fillBatch();
-            if(batchList.size()>=maxBatchSize){
-                break;
+    protected synchronized PendingMessageCursor getNextCursor() throws Exception{
+        if(currentCursor==null||currentCursor.isEmpty()){
+            currentCursor=null;
+            for(Iterator i=storePrefetches.iterator();i.hasNext();){
+                AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+                tsp.setMaxBatchSize(getMaxBatchSize());
+                if(tsp.hasNext()){
+                    currentCursor=tsp;
+                    break;
+                }
             }
+            // round-robin
+            Object obj=storePrefetches.removeFirst();
+            storePrefetches.addLast(obj);
         }
-        // round-robin
-        Object obj=storePrefetches.removeFirst();
-        storePrefetches.addLast(obj);
+        return currentCursor;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct  9 06:05:20 2006
@@ -1,20 +1,27 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
+
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
 import java.util.LinkedList;
+import javax.jms.JMSException;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -23,134 +30,114 @@
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
- * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * perist pending messages pending message (messages awaiting disptach to a
+ * consumer) cursor
  * 
  * @version $Revision$
  */
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements
+        MessageRecoveryListener {
+
     static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
-    private Topic topic;
+   
     private TopicMessageStore store;
-    private LinkedList batchList;
+    private final LinkedList batchList=new LinkedList();
     private String clientId;
     private String subscriberName;
-    private int pendingCount=0;
     private MessageId lastMessageId;
-    private int maxBatchSize=10;
+    private Destination regionDestination;
 
     /**
      * @param topic
-     * @param batchList
      * @param clientId
      * @param subscriberName
      * @throws IOException
      */
-    public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
-        this.topic=topic;
-        this.store=(TopicMessageStore) topic.getMessageStore();
-        this.batchList=batchList;
+    public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
+        this.regionDestination = topic;
+        this.store=(TopicMessageStore)topic.getMessageStore();
         this.clientId=clientId;
         this.subscriberName=subscriberName;
     }
 
     public void start() throws Exception{
-        pendingCount=store.getMessageCount(clientId,subscriberName);
-        System.err.println("Pending count = "+pendingCount);
     }
 
     public void stop() throws Exception{
-        pendingCount=0;
-        lastMessageId=null;
+        store.resetBatching(clientId,clientId,null);
     }
 
     /**
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return pendingCount==0;
-    }
-
-    /**
-     * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
-     * may do
-     * 
-     * @see org.apache.activemq.region.cursors.PendingMessageCursor
-     * @return true if recovery required
-     */
-    public boolean isRecoveryRequired(){
-        return false;
+        return batchList.isEmpty();
     }
-
-    public synchronized void addMessageFirst(MessageReference node){
-        pendingCount++;
-    }
-
-    public synchronized void addMessageLast(MessageReference node){
-        pendingCount++;
-    }
-
-    public void clear(){
-        pendingCount=0;
-        lastMessageId=null;
+    
+    public synchronized int size(){
+        try{
+            return store.getMessageCount(clientId,subscriberName);
+        }catch(IOException e){
+            log.error(this + " Failed to get the outstanding message count from the store",e);
+            throw new RuntimeException(e);
+        }
     }
 
     public synchronized boolean hasNext(){
+        if(isEmpty()){
+            try{
+                fillBatch();
+            }catch(Exception e){
+                log.error("Failed to fill batch",e);
+                throw new RuntimeException(e);
+            }
+        }
         return !isEmpty();
     }
 
     public synchronized MessageReference next(){
-        MessageReference result=null;
-        if(!isEmpty()){
-            if(batchList.isEmpty()){
-                try{
-                    fillBatch();
-                }catch(Exception e){
-                    log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
-                    throw new RuntimeException(e);
-                }
-            }
-            result=(MessageReference) batchList.removeFirst();
-        }
+        Message result = (Message)batchList.removeFirst();
+        result.setRegionDestination(regionDestination);
         return result;
     }
 
-    public synchronized void remove(){
-        pendingCount--;
-    }
-
     public void reset(){
-        batchList.clear();
-    }
-
-    public int size(){
-        return pendingCount;
     }
 
     // MessageRecoveryListener implementation
-    public void finished(){}
+    public void finished(){
+    }
 
     public void recoverMessage(Message message) throws Exception{
+        message.setRegionDestination(regionDestination);
         batchList.addLast(message);
     }
 
-    public void recoverMessageReference(String messageReference) throws Exception{
+    public void recoverMessageReference(String messageReference)
+            throws Exception{
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
 
     // implementation
     protected void fillBatch() throws Exception{
-        if(pendingCount<=0){
-            pendingCount=store.getMessageCount(clientId,subscriberName);
-        }
-        if(pendingCount>0){
-            store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
-            // this will add more messages to the batch list
-            if(!batchList.isEmpty()){
-                Message message=(Message) batchList.getLast();
-                lastMessageId=message.getMessageId();
-            }
+        store.recoverNextMessages(clientId,subscriberName,lastMessageId,
+                maxBatchSize,this);
+        // this will add more messages to the batch list
+        if(!batchList.isEmpty()){
+            Message message=(Message)batchList.getLast();
+            lastMessageId=message.getMessageId();
         }
+    }
+    
+    public String toString() {
+        return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
+    }
+    
+    synchronized void nextToDispatch(MessageId id) throws Exception {
+        lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
+        store.resetBatching(clientId,clientId,id);        
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Mon Oct  9 06:05:20 2006
@@ -1,58 +1,55 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.kaha;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
+
 /**
- *Represents a container of persistent objects in the store
- *Acts as a map, but values can be retrieved in insertion order
+ * Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion
+ * order
  * 
  * @version $Revision: 1.2 $
  */
 public interface ListContainer extends List{
+
     /**
-     * The container is created or retrieved in 
-     * an unloaded state.
-     * load populates the container will all the indexes used etc
-     * and should be called before any operations on the container
+     * The container is created or retrieved in an unloaded state. load populates the container will all the indexes
+     * used etc and should be called before any operations on the container
      */
     public void load();
-    
+
     /**
      * unload indexes from the container
-     *
+     * 
      */
     public void unload();
-    
+
     /**
      * @return true if the indexes are loaded
      */
     public boolean isLoaded();
-    
-    
+
     /**
-     * For homogenous containers can set a custom marshaller for loading values
-     * The default uses Object serialization
-     * @param marshaller 
+     * For homogenous containers can set a custom marshaller for loading values The default uses Object serialization
+     * 
+     * @param marshaller
      */
     public void setMarshaller(Marshaller marshaller);
+
     /**
      * @return the id the MapContainer was create with
      */
@@ -62,46 +59,46 @@
      * @return the number of values in the container
      */
     public int size();
-    
+
     /**
      * Inserts the given element at the beginning of this list.
-     *
+     * 
      * @param o the element to be inserted at the beginning of this list.
      */
     public void addFirst(Object o);
 
     /**
-     * Appends the given element to the end of this list.  (Identical in
-     * function to the <tt>add</tt> method; included only for consistency.)
-     *
+     * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
+     * only for consistency.)
+     * 
      * @param o the element to be inserted at the end of this list.
      */
     public void addLast(Object o);
-    
+
     /**
      * Removes and returns the first element from this list.
-     *
+     * 
      * @return the first element from this list.
-     * @throws    NoSuchElementException if this list is empty.
+     * @throws NoSuchElementException if this list is empty.
      */
     public Object removeFirst();
 
     /**
      * Removes and returns the last element from this list.
-     *
+     * 
      * @return the last element from this list.
-     * @throws    NoSuchElementException if this list is empty.
+     * @throws NoSuchElementException if this list is empty.
      */
     public Object removeLast();
-    
-    
+
     /**
      * remove an objecr from the list without retrieving the old value from the store
+     * 
      * @param position
      * @return true if successful
      */
     public boolean doRemove(int position);
-    
+
     /**
      * @return the maximumCacheSize
      */
@@ -111,46 +108,87 @@
      * @param maximumCacheSize the maximumCacheSize to set
      */
     public void setMaximumCacheSize(int maximumCacheSize);
-      
+
     /**
      * clear any cached values
      */
     public void clearCache();
-    
+
     /**
      * add an Object to the list but get a StoreEntry of its position
+     * 
      * @param object
      * @return the entry in the Store
      */
     public StoreEntry placeLast(Object object);
-    
+
     /**
      * insert an Object in first position int the list but get a StoreEntry of its position
+     * 
      * @param object
      * @return the location in the Store
      */
     public StoreEntry placeFirst(Object object);
-    
+
     /**
      * Advanced feature = must ensure the object written doesn't overwrite other objects in the container
-     * @param entry 
-     * @param object 
+     * 
+     * @param entry
+     * @param object
      */
-    public void update(StoreEntry entry, Object object);
-    
+    public void update(StoreEntry entry,Object object);
+
     /**
      * Retrieve an Object from the Store by its location
+     * 
      * @param entry
      * @return the Object at that entry
      */
     public Object get(StoreEntry entry);
-    
+
+    /**
+     * Get the StoreEntry for the first item of the list
+     * 
+     * @return the first StoreEntry or null if the list is empty
+     */
+    public StoreEntry getFirst();
+
+    /**
+     * Get yjr StoreEntry for the last item of the list
+     * 
+     * @return the last StoreEntry or null if the list is empty
+     */
+    public StoreEntry getLast();
+
+    /**
+     * Get the next StoreEntry from the list
+     * 
+     * @param entry
+     * @return the next StoreEntry or null
+     */
+    public StoreEntry getNext(StoreEntry entry);
+
+    /**
+     * Get the previous StoreEntry from the list
+     * 
+     * @param entry
+     * @return the previous store entry or null
+     */
+    public StoreEntry getPrevious(StoreEntry entry);
+
     /**
      * remove the Object at the StoreEntry
+     * 
      * @param entry
      * @return true if successful
      */
     public boolean remove(StoreEntry entry);
     
-    
+    /**
+     * It's possible that a StoreEntry could be come stale
+     * this will return an upto date entry for the StoreEntry position
+     * @param entry old entry
+     * @return a refreshed StoreEntry
+     */
+    public StoreEntry refresh(StoreEntry entry);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Oct  9 06:05:20 2006
@@ -381,7 +381,7 @@
         if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
             throw new RuntimeException("Unknown IndexType: "+type);
         }
-        this.indexType=indexType;
+        this.indexType=type;
     }
     
     public synchronized void initialize() throws IOException{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java Mon Oct  9 06:05:20 2006
@@ -18,9 +18,7 @@
 package org.apache.activemq.kaha.impl.container;
 
 import java.util.ListIterator;
-
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
 /** 
 * @version $Revision$
@@ -28,14 +26,12 @@
 public class CachedContainerListIterator implements ListIterator{
     
     protected ListContainerImpl container;
-    protected IndexLinkedList list;
     protected int pos = 0;
     protected int nextPos =0;
     protected StoreEntry currentItem;
 
     protected CachedContainerListIterator(ListContainerImpl container,int start){
         this.container=container;
-        this.list=list;
         this.pos=start;
         this.nextPos = this.pos;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Mon Oct  9 06:05:20 2006
@@ -1,19 +1,15 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.kaha.impl.container;
@@ -52,8 +48,8 @@
     protected int maximumCacheSize=100;
     protected IndexItem lastCached;
 
-    public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType)
-            throws IOException{
+    public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
+            String indexType) throws IOException{
         super(id,root,indexManager,dataManager,indexType);
     }
 
@@ -462,15 +458,15 @@
         indexList.add(index,item);
         itemAdded(item,index,element);
     }
-    
-    protected StoreEntry internalAddLast(Object o) {
+
+    protected StoreEntry internalAddLast(Object o){
         load();
         IndexItem item=writeLast(o);
         indexList.addLast(item);
         itemAdded(item,indexList.size()-1,o);
         return item;
     }
-    
+
     protected StoreEntry internalAddFirst(Object o){
         load();
         IndexItem item=writeFirst(o);
@@ -486,8 +482,6 @@
         itemAdded(item,index,element);
         return item;
     }
-    
-    
 
     protected StoreEntry internalGet(int index){
         load();
@@ -623,27 +617,29 @@
         }
         return result;
     }
-    
+
     /**
      * add an Object to the list but get a StoreEntry of its position
+     * 
      * @param object
      * @return the entry in the Store
      */
-    public synchronized StoreEntry placeLast(Object object) {
-        StoreEntry item = internalAddLast(object);
+    public synchronized StoreEntry placeLast(Object object){
+        StoreEntry item=internalAddLast(object);
         return item;
     }
-    
+
     /**
      * insert an Object in first position int the list but get a StoreEntry of its position
+     * 
      * @param object
      * @return the location in the Store
      */
-    public synchronized StoreEntry placeFirst(Object object) {
-        StoreEntry item = internalAddFirst(object);
+    public synchronized StoreEntry placeFirst(Object object){
+        StoreEntry item=internalAddFirst(object);
         return item;
     }
-    
+
     /**
      * @param entry
      * @param object
@@ -651,41 +647,90 @@
      */
     public void update(StoreEntry entry,Object object){
         try{
-            dataManager.updateItem(entry.getValueDataItem(),marshaller, object);
+            dataManager.updateItem(entry.getValueDataItem(),marshaller,object);
         }catch(IOException e){
             throw new RuntimeException(e);
         }
-        
+    }
+
+    /**
+     * Retrieve an Object from the Store by its location
+     * 
+     * @param entry
+     * @return the Object at that entry
+     */
+    public synchronized Object get(StoreEntry entry){
+        load();
+        return getValue(entry);
+    }
+
+    /**
+     * remove the Object at the StoreEntry
+     * 
+     * @param entry
+     * @return true if successful
+     */
+    public synchronized boolean remove(StoreEntry entry){
+        IndexItem item=(IndexItem)entry;
+        load();
+        boolean result=false;
+        if(item!=null){
+            clearCache();
+            remove(item);
+            result = true;
+        }
+        return result;
+    }
+
+    /**
+     * Get the StoreEntry for the first item of the list
+     * 
+     * @return the first StoreEntry or null if the list is empty
+     */
+    public synchronized StoreEntry getFirst(){
+        return indexList.getFirst();
+    }
+
+    /**
+     * Get yjr StoreEntry for the last item of the list
+     * 
+     * @return the last StoreEntry or null if the list is empty
+     */
+    public synchronized StoreEntry getLast(){
+        return indexList.getLast();
+    }
+
+    /**
+     * Get the next StoreEntry from the list
+     * 
+     * @param entry
+     * @return the next StoreEntry or null
+     */
+    public synchronized StoreEntry getNext(StoreEntry entry){
+        IndexItem item=(IndexItem)entry;
+        return indexList.getNextEntry(item);
+    }
+
+    /**
+     * Get the previous StoreEntry from the list
+     * 
+     * @param entry
+     * @return the previous store entry or null
+     */
+    public synchronized StoreEntry getPrevious(StoreEntry entry){
+        IndexItem item=(IndexItem)entry;
+        return indexList.getPrevEntry(item);
     }
     
     /**
-    * Retrieve an Object from the Store by its location
-    * @param entry
-    * @return the Object at that entry
-    */
-   public synchronized Object get(StoreEntry entry) {
-       load();
-       return getValue(entry);
-   }
-   
-   /**
-    * remove the Object at the StoreEntry
-    * @param entry
-    * @return true if successful
-    */
-   public synchronized boolean remove(StoreEntry entry) {
-       IndexItem item = (IndexItem)entry;
-       load();
-       boolean result = false;
-       if(item!=null){
-           clearCache();
-           IndexItem prev=indexList.getPrevEntry(item);
-           prev=prev!=null?prev:root;
-           IndexItem next=indexList.getNextEntry(item);
-           delete(item,prev,next);
-       }
-       return result;
-   }
+     * It's possible that a StoreEntry could be come stale
+     * this will return an upto date entry for the StoreEntry position
+     * @param entry old entry
+     * @return a refreshed StoreEntry
+     */
+    public synchronized StoreEntry refresh(StoreEntry entry) {
+        return indexList.getEntry(entry);
+    }
 
     protected IndexItem writeLast(Object value){
         IndexItem index=null;
@@ -782,7 +827,7 @@
         if(item!=null){
             try{
                 // ensure it's up to date
-                //item=indexList.getEntry(item);
+                // item=indexList.getEntry(item);
                 StoreLocation data=item.getValueDataItem();
                 result=dataManager.readItem(marshaller,data);
             }catch(IOException e){
@@ -903,8 +948,7 @@
     }
 
     /**
-     * @param cacheList
-     *            the cacheList to set
+     * @param cacheList the cacheList to set
      */
     public synchronized void setCacheList(LinkedList cacheList){
         this.cacheList=cacheList;
@@ -918,8 +962,7 @@
     }
 
     /**
-     * @param lastCached
-     *            the lastCached to set
+     * @param lastCached the lastCached to set
      */
     public synchronized void setLastCached(IndexItem lastCached){
         this.lastCached=lastCached;
@@ -933,8 +976,7 @@
     }
 
     /**
-     * @param maximumCacheSize
-     *            the maximumCacheSize to set
+     * @param maximumCacheSize the maximumCacheSize to set
      */
     public synchronized void setMaximumCacheSize(int maximumCacheSize){
         this.maximumCacheSize=maximumCacheSize;
@@ -948,12 +990,9 @@
     }
 
     /**
-     * @param offset
-     *            the offset to set
+     * @param offset the offset to set
      */
     public synchronized void setOffset(int offset){
         this.offset=offset;
     }
-
-   
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -86,8 +86,16 @@
         delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
     }
     
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
-        return delegate.getNextMessageToDeliver(clientId,subscriptionName);
+    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+        delegate.resetBatching(clientId,subscriptionName,id);
+    }
+    
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
+    }
+    
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
     }
     
     public ActiveMQDestination getDestination() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -78,15 +78,37 @@
     public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
                     MessageRecoveryListener listener) throws Exception;
 
+    /**
+     * A hint to the Store to reset any batching state for a durable subsriber
+     * @param clientId 
+     * @param subscriptionName 
+     * @param nextToDispatch 
+     *
+     */
+    public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
+    
+    /**
+     * Get the next  messageId to deliver to a subscriber after the MessageId provided
+     * @param clientId
+     * @param subscriptionName
+     * @param id 
+     * @return the next messageId or null
+     * @throws IOException 
+     * @throws Exception 
+     */
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
+    
     
     /**
-     * Get the next un-acknowledged message to deliver to a subscriber
+     * Get the previous  messageId to deliver to a subscriber before the MessageId provided
      * @param clientId
      * @param subscriptionName
-     * @return the next message or null
+     * @param id 
+     * @return the next messageId or null
      * @throws IOException 
+     * @throws Exception 
      */
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
     
     
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Oct  9 06:05:20 2006
@@ -1,26 +1,22 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Set;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -28,62 +24,69 @@
 /**
  * @version $Revision: 1.5 $
  */
-public interface JDBCAdapter {
-    
+public interface JDBCAdapter{
+
     public void setStatements(Statements statementProvider);
-    
-    public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
 
-    public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
+    public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException;
+
+    public abstract void doDropTables(TransactionContext c) throws SQLException,IOException;
 
-    public abstract void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
-            long expiration) throws SQLException, IOException;
-    public abstract void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
+    public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
+            byte[] data,long expiration) throws SQLException,IOException;
 
-    public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
-    public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
+    public abstract void doAddMessageReference(TransactionContext c,MessageId messageId,
+            ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException;
 
-    public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
+    public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException;
 
-    public abstract void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
-            throws Exception;
+    public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException;
 
-    public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException,
-            IOException;
+    public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException;
 
-    public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
-            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
-    
-    public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
-                    String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
+    public abstract void doRecover(TransactionContext c,ActiveMQDestination destination,
+            JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
-            String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
+    public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,long seq) throws SQLException,IOException;
 
-    public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 
-            String clientId, String subscriptionName)
-            throws SQLException, IOException;
+    public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
+    public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
+    public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
 
-    public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName)
-            throws SQLException, IOException;
+    public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriptionName) throws SQLException,IOException;
 
-    public abstract void doDeleteOldMessages(TransactionContext c)
-        throws SQLException, IOException;
+    public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException;
 
-    public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
+    public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName)
+            throws SQLException,IOException;
 
-    public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
+    public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId,
+            String subscriptionName) throws SQLException,IOException;
+
+    public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException;
+
+    public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException;
+
+    public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException;
 
     public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
 
-    public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+    public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
+            throws SQLException,IOException;
+
+    public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName) throws SQLException,IOException;
 
-    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
-    
-    public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
+    public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
 
+    public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @version $Revision: 1.6 $
@@ -110,12 +111,16 @@
                     });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+
         } finally {
             c.close();
         }
         
     }
+    
+    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+    }
+    
     /**
      * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
      *      boolean)
@@ -175,21 +180,75 @@
         }
     }
 
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
-        Message result = null;
-    
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        
+        final MessageId result = new MessageId();
+        final AtomicBoolean initalized = new AtomicBoolean();
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
-            result = (Message) wireFormat.unmarshal(new ByteSequence(data));
+            long sequence = id != null ? id.getBrokerSequenceId() : -1;
+           adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
+               public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+                   Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                   msg.getMessageId().setBrokerSequenceId(sequenceId);
+                   result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
+                   initalized.set(true);
+                   
+               }
+               public void recoverMessageReference(String reference) throws Exception {
+                   result.setValue(reference);
+                   initalized.set(true);
+                   
+               }
+               
+               public void finished(){          
+               }
+           });
+           
                
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+            throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
         }
-        return result;
+        return initalized.get () ? result : null;
+    }
+    
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        final MessageId result = new MessageId();
+        final AtomicBoolean initalized = new AtomicBoolean();
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            long sequence = id != null ? id.getBrokerSequenceId() : -1;
+           adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
+               public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+                   Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                   msg.getMessageId().setBrokerSequenceId(sequenceId);
+                   result.setProducerId(msg.getMessageId().getProducerId());
+                   result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
+                   result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
+                   initalized.set(true);
+                   
+               }
+               public void recoverMessageReference(String reference) throws Exception {
+                   result.setValue(reference);
+                   initalized.set(true);
+                   
+               }
+               
+               public void finished(){          
+               }
+           });
+           
+               
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return initalized.get () ? result : null;
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
@@ -200,7 +259,7 @@
                
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+            throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Oct  9 06:05:20 2006
@@ -64,6 +64,8 @@
     private String lockUpdateStatement;
     private String nextDurableSubscriberMessageStatement;
     private String durableSubscriberMessageCountStatement;
+    private String nextDurableSubscriberMessageIdStatement;
+    private String prevDurableSubscriberMessageIdStatement;
     private boolean useLockCreateWhereClause;
 
     public String[] getCreateSchemaStatements() {
@@ -210,10 +212,9 @@
     
     public String getFindDurableSubMessagesStatement(){
         if(findDurableSubMessagesStatement==null){
-            findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
-                            +getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*) FROM "
-                            +getFullMessageTableName()+" M, " +  getFullAckTableName() + " D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-                            +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)";
+            findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+            + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID";
         }
         return findDurableSubMessagesStatement;
     }
@@ -229,10 +230,9 @@
     
     public String getNextDurableSubscriberMessageStatement(){
         if (nextDurableSubscriberMessageStatement == null){
-            nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
-            +getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM "
-            +getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)"; 
+            nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+            + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
         }
         return nextDurableSubscriberMessageStatement;
     }
@@ -240,14 +240,55 @@
     /**
      * @return the durableSubscriberMessageCountStatement
      */
+    
+    
     public String getDurableSubscriberMessageCountStatement(){
         if (durableSubscriberMessageCountStatement==null){
-            durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
-            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+            durableSubscriberMessageCountStatement =  "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, "
+            + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
         }
         return durableSubscriberMessageCountStatement;
     }
+    
+    /**
+     * @return the nextDurableSubscriberMessageIdStatement
+     */
+    public String getNextDurableSubscriberMessageIdStatement(){
+        if (nextDurableSubscriberMessageIdStatement==null) {
+            nextDurableSubscriberMessageIdStatement =
+                "SELECT M.ID FROM " + getFullMessageTableName() + " M, "
+                 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
+        }
+        return nextDurableSubscriberMessageIdStatement;
+    }
+    
+    /**
+     * @return the prevDurableSubscriberMessageIdStatement
+     */
+   /*
+    public String getPrevDurableSubscriberMessageIdStatement(){
+        if(prevDurableSubscriberMessageIdStatement==null) {
+            prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+            + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            + " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID ";
+        }
+        return prevDurableSubscriberMessageIdStatement;
+    }
+    */
+   
+   
+    public String getPrevDurableSubscriberMessageIdStatement(){
+        if(prevDurableSubscriberMessageIdStatement==null) {
+            prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M "
+            + " WHERE M.CONTAINER=? "
+            + "  AND M.ID <?" + "  ORDER BY M.ID DESC ";
+        }
+        return prevDurableSubscriberMessageIdStatement;
+    }
+    
+
 
     public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
@@ -564,5 +605,26 @@
      */
     public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
         this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
+    }
+
+    
+    
+
+    
+    /**
+     * @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement to set
+     */
+    public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){
+        this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
+    }
+
+    
+   
+    
+    /**
+     * @param prevDurableSubscriberMessageIdStatement the prevDurableSubscriberMessageIdStatement to set
+     */
+    public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
+        this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
     }
 }