You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/17 11:34:00 UTC

svn commit: r476101 [1/2] - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/cursors/ store/ store/jdbc/ store/jdbc/adapter/ store/journal/ store/kahadaptor/ store/memory/ store/rapid/

Author: rajdavies
Date: Fri Nov 17 02:33:57 2006
New Revision: 476101

URL: http://svn.apache.org/viewvc?view=rev&rev=476101
Log:
Adding persitent cursor support for Queues

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=auto&rev=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri Nov 17 02:33:57 2006
@@ -0,0 +1,142 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import javax.jms.JMSException;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * perist pending messages pending message (messages awaiting disptach to a
+ * consumer) cursor
+ * 
+ * @version $Revision: 474985 $
+ */
+class QueueStorePrefetch extends AbstractPendingMessageCursor implements
+        MessageRecoveryListener {
+
+    static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
+   
+    private MessageStore store;
+    private final LinkedList batchList=new LinkedList();
+    private Destination regionDestination;
+
+    /**
+     * @param topic
+     * @param clientId
+     * @param subscriberName
+     * @throws IOException
+     */
+    public QueueStorePrefetch(Queue queue){
+        this.regionDestination = queue;
+        this.store=(MessageStore)queue.getMessageStore();
+        
+    }
+
+    public void start() throws Exception{
+    }
+
+    public void stop() throws Exception{
+        store.resetBatching();
+    }
+
+    /**
+     * @return true if there are no pending messages
+     */
+    public boolean isEmpty(){
+        return batchList.isEmpty();
+    }
+    
+    public synchronized int size(){
+        try {
+        return store.getMessageCount();
+        }catch(IOException e) {
+            log.error("Failed to get message count",e);
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public synchronized void addMessageLast(MessageReference node) throws Exception{
+        if(node!=null){
+            node.decrementReferenceCount();
+        }
+    }
+
+    public synchronized boolean hasNext(){
+        if(isEmpty()){
+            try{
+                fillBatch();
+            }catch(Exception e){
+                log.error("Failed to fill batch",e);
+                throw new RuntimeException(e);
+            }
+        }
+        return !isEmpty();
+    }
+
+    public synchronized MessageReference next(){
+        Message result = (Message)batchList.removeFirst();
+        result.setRegionDestination(regionDestination);
+        return result;
+    }
+
+    public void reset(){
+    }
+
+    // MessageRecoveryListener implementation
+    public void finished(){
+    }
+
+    public void recoverMessage(Message message) throws Exception{
+        message.setRegionDestination(regionDestination);
+        message.incrementReferenceCount();
+        batchList.addLast(message);
+    }
+
+    public void recoverMessageReference(String messageReference)
+            throws Exception{
+        // shouldn't get called
+        throw new RuntimeException("Not supported");
+    }
+
+    // implementation
+    protected void fillBatch() throws Exception{
+        store.recoverNextMessages(maxBatchSize,this);
+        // this will add more messages to the batch list
+        if(!batchList.isEmpty()){
+            Message message=(Message)batchList.getLast();
+        }
+    }
+    
+    public String toString() {
+        return "QueueStorePrefetch" + System.identityHashCode(this) ;
+    }
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Nov 17 02:33:57 2006
@@ -27,7 +27,6 @@
 import org.apache.activemq.kaha.Store;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
@@ -142,12 +141,6 @@
                 TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
                 if(tsp!=null){
                     tsp.addMessageLast(node);
-                    if(started){
-                        // if the store has been empty - then this message is next to dispatch
-                        if((pendingCount-nonPersistent.size())<=0){
-                            tsp.nextToDispatch(node.getMessageId());
-                        }
-                    }
                 }
             }
         }
@@ -190,6 +183,7 @@
     }
 
     public synchronized void reset(){
+        nonPersistent.reset();
         for(Iterator i=storePrefetches.iterator();i.hasNext();){
             AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
             tsp.reset();
@@ -199,21 +193,27 @@
     public int size(){
         return pendingCount;
     }
+    
+    public synchronized void setMaxBatchSize(int maxBatchSize){
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+            tsp.setMaxBatchSize(maxBatchSize);
+        }
+        super.setMaxBatchSize(maxBatchSize);
+    }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception{
         if(currentCursor==null||currentCursor.isEmpty()){
             currentCursor=null;
             for(Iterator i=storePrefetches.iterator();i.hasNext();){
-                AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
-                tsp.setMaxBatchSize(getMaxBatchSize());
+                AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next(); 
                 if(tsp.hasNext()){
                     currentCursor=tsp;
                     break;
                 }
             }
             // round-robin
-            Object obj=storePrefetches.removeFirst();
-            storePrefetches.addLast(obj);
+            storePrefetches.addLast(storePrefetches.removeFirst());
         }
         return currentCursor;
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=auto&rev=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Nov 17 02:33:57 2006
@@ -0,0 +1,170 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Iterator;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Store based Cursor for Queues
+ * 
+ * @version $Revision: 474985 $
+ */
+public class StoreQueueCursor extends AbstractPendingMessageCursor{
+
+    static private final Log log=LogFactory.getLog(StoreQueueCursor.class);
+    private int pendingCount=0;
+    private Queue queue;
+    private Store tmpStore;
+    private PendingMessageCursor nonPersistent;
+    private QueueStorePrefetch persistent;
+    private boolean started;
+    private PendingMessageCursor currentCursor;
+   
+    /**
+     * Construct
+     * 
+     * @param queue
+     * @param tmpStore
+     */
+    public StoreQueueCursor(Queue queue,Store tmpStore){
+        this.queue=queue;
+        this.tmpStore=tmpStore;
+        this.persistent=new QueueStorePrefetch(queue);
+    }
+
+    public synchronized void start() throws Exception{
+        started=true;
+        if(nonPersistent==null){
+            nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
+            nonPersistent.setMaxBatchSize(getMaxBatchSize());
+        }
+        nonPersistent.start();
+        pendingCount=persistent.size();
+    }
+
+    public synchronized void stop() throws Exception{
+        started=false;
+        if(nonPersistent!=null){
+            nonPersistent.stop();
+        }
+        pendingCount=0;
+    }
+
+    public synchronized void addMessageLast(MessageReference node) throws Exception{
+        if(node!=null){
+            Message msg=node.getMessage();
+            if(started){
+                pendingCount++;
+                if(!msg.isPersistent()){
+                    nonPersistent.addMessageLast(node);
+                }
+            }
+            if(msg.isPersistent()){
+                persistent.addMessageLast(node);
+            }
+        }
+    }
+
+    public void clear(){
+        pendingCount=0;
+    }
+
+    public synchronized boolean hasNext(){
+        boolean result=pendingCount>0;
+        if(result){
+            try{
+                currentCursor=getNextCursor();
+            }catch(Exception e){
+                log.error("Failed to get current cursor ",e);
+                throw new RuntimeException(e);
+            }
+            result=currentCursor!=null?currentCursor.hasNext():false;
+        }
+        return result;
+    }
+
+    public synchronized MessageReference next(){
+        return currentCursor!=null?currentCursor.next():null;
+    }
+
+    public synchronized void remove(){
+        if(currentCursor!=null){
+            currentCursor.remove();
+        }
+        pendingCount--;
+    }
+
+    public void remove(MessageReference node){
+        pendingCount--;
+    }
+
+    public synchronized void reset(){
+        nonPersistent.reset();
+    }
+
+    public int size(){
+        return pendingCount;
+    }
+
+    public synchronized boolean isEmpty(){
+        return pendingCount<=0;
+    }
+
+    /**
+     * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+     * may do
+     * 
+     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @return true if recovery required
+     */
+    public boolean isRecoveryRequired(){
+        return false;
+    }
+
+    /**
+     * @return the nonPersistent Cursor
+     */
+    public PendingMessageCursor getNonPersistent(){
+        return this.nonPersistent;
+    }
+
+    /**
+     * @param nonPersistent cursor to set
+     */
+    public void setNonPersistent(PendingMessageCursor nonPersistent){
+        this.nonPersistent=nonPersistent;
+    }
+
+    public void setMaxBatchSize(int maxBatchSize){
+        persistent.setMaxBatchSize(maxBatchSize);
+        if(nonPersistent!=null){
+            nonPersistent.setMaxBatchSize(maxBatchSize);
+        }
+        super.setMaxBatchSize(maxBatchSize);
+    }
+
+    protected synchronized PendingMessageCursor getNextCursor() throws Exception{
+        if(currentCursor==null||currentCursor.isEmpty()){
+            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
+        }
+        return currentCursor;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Fri Nov 17 02:33:57 2006
@@ -46,7 +46,6 @@
     private final LinkedList batchList=new LinkedList();
     private String clientId;
     private String subscriberName;
-    private MessageId lastMessageId;
     private Destination regionDestination;
 
     /**
@@ -66,7 +65,7 @@
     }
 
     public void stop() throws Exception{
-        store.resetBatching(clientId,clientId,null);
+        store.resetBatching(clientId,subscriberName);
     }
 
     /**
@@ -130,12 +129,12 @@
 
     // implementation
     protected void fillBatch() throws Exception{
-        store.recoverNextMessages(clientId,subscriberName,lastMessageId,
+        store.recoverNextMessages(clientId,subscriberName,
                 maxBatchSize,this);
         // this will add more messages to the batch list
         if(!batchList.isEmpty()){
             Message message=(Message)batchList.getLast();
-            lastMessageId=message.getMessageId();
+          
         }
     }
     
@@ -143,8 +142,4 @@
         return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
     }
     
-    synchronized void nextToDispatch(MessageId id) throws Exception {
-        lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
-        store.resetBatching(clientId,clientId,id);        
-    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,24 +1,20 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store;
 
 import java.io.IOException;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -28,70 +24,84 @@
 import org.apache.activemq.memory.UsageManager;
 
 /**
- * Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer}
+ * Represents a message store which is used by the persistent 
  * implementations
- *
+ * 
  * @version $Revision: 1.5 $
  */
-public interface MessageStore extends Service {
-    
+public interface MessageStore extends Service{
+
     /**
      * Adds a message to the message store
-     * @param context TODO
+     * 
+     * @param context context
+     * @param message 
+     * @throws IOException 
      */
-    public void addMessage(ConnectionContext context, Message message) throws IOException;
+    public void addMessage(ConnectionContext context,Message message) throws IOException;
 
     /**
      * Adds a message reference to the message store
-     * @param context TODO
-     * @param messageId TODO
-     * @param expirationTime TODO
-     */
-    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException;
+     * 
+     * @param context 
+     * @param messageId 
+     * @param expirationTime 
+     * @param messageRef 
+     * @throws IOException 
+     */
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+            throws IOException;
 
     /**
-     * Looks up a message using either the String messageID or
-     * the messageNumber. Implementations are encouraged to fill in the missing
-     * key if its easy to do so.
+     * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+     * in the missing key if its easy to do so.
+     * 
      * @param identity which contains either the messageID or the messageNumber
      * @return the message or null if it does not exist
+     * @throws IOException 
      */
     public Message getMessage(MessageId identity) throws IOException;
 
     /**
-     * Looks up a message using either the String messageID or
-     * the messageNumber. Implementations are encouraged to fill in the missing
-     * key if its easy to do so.
+     * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+     * in the missing key if its easy to do so.
+     * 
      * @param identity which contains either the messageID or the messageNumber
      * @return the message or null if it does not exist
+     * @throws IOException 
      */
     public String getMessageReference(MessageId identity) throws IOException;
 
     /**
      * Removes a message from the message store.
-     * @param context TODO
-     * @param ack the ack request that cause the message to be removed.  It conatins 
-     *   the identity which contains the messageID of the message that needs to be removed.
+     * 
+     * @param context 
+     * @param ack the ack request that cause the message to be removed. It conatins the identity which contains the
+     *            messageID of the message that needs to be removed.
+     * @throws IOException 
      */
-    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
+    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException;
 
     /**
      * Removes all the messages from the message store.
-     * @param context TODO
+     * 
+     * @param context 
+     * @throws IOException 
      */
     public void removeAllMessages(ConnectionContext context) throws IOException;
-    
+
     /**
      * Recover any messages to be delivered.
-     *
+     * 
      * @param container
-     * @throws Exception 
+     * @throws Exception
      */
     public void recover(MessageRecoveryListener container) throws Exception;
 
     /**
      * The destination that the message store is holding messages for.
-     * @return
+     * 
+     * @return the destination
      */
     public ActiveMQDestination getDestination();
 
@@ -100,4 +110,23 @@
      */
     public void setUsageManager(UsageManager usageManager);
 
+    /**
+     * @return the number of messages ready to deliver
+     * @throws IOException 
+     * 
+     */
+    public int getMessageCount() throws IOException;
+
+    /**
+     * A hint to the Store to reset any batching state for the Destination
+     * 
+     * @param nextToDispatch
+     * 
+     */
+    public void resetBatching();
+
+    
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener)
+            throws Exception;
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Fri Nov 17 02:33:57 2006
@@ -104,6 +104,4 @@
      * @param usageManager The UsageManager that is controlling the broker's memory usage.
      */
     public void setUsageManager(UsageManager usageManager);
-
-   
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Fri Nov 17 02:33:57 2006
@@ -77,4 +77,20 @@
     public void setUsageManager(UsageManager usageManager) {
         delegate.setUsageManager(usageManager);
     }
+
+ 
+    public int getMessageCount() throws IOException{
+        return delegate.getMessageCount();
+    }
+
+
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+       delegate.recoverNextMessages(maxReturned,listener);
+        
+    }
+
+    public void resetBatching(){
+        delegate.resetBatching();
+        
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -82,22 +82,15 @@
         delegate.recoverSubscription(clientId, subscriptionName, listener);
     }
     
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
-        delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+        delegate.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
     }
     
-    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
-        delegate.resetBatching(clientId,subscriptionName,id);
-    }
-    
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
-    }
-    
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
+    public void resetBatching(String clientId,String subscriptionName) {
+        delegate.resetBatching(clientId,subscriptionName);
     }
     
+       
     public ActiveMQDestination getDestination() {
         return delegate.getDestination();
     }
@@ -120,4 +113,19 @@
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         return delegate.getMessageCount(clientId,subscriberName);
     }    
+    
+   
+    public int getMessageCount() throws IOException{
+        return delegate.getMessageCount();
+    }
+
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+       delegate.recoverNextMessages(maxReturned,listener);
+        
+    }
+
+    public void resetBatching(){
+        delegate.resetBatching();
+        
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -69,46 +69,21 @@
      * 
      * @param clientId
      * @param subscriptionName
-     * @param lastMessageId
      * @param maxReturned
      * @param listener
      * 
      * @throws Exception
      */
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
                     MessageRecoveryListener listener) throws Exception;
 
     /**
      * A hint to the Store to reset any batching state for a durable subsriber
      * @param clientId 
      * @param subscriptionName 
-     * @param nextToDispatch 
      *
      */
-    public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
-    
-    /**
-     * Get the next  messageId to deliver to a subscriber after the MessageId provided
-     * @param clientId
-     * @param subscriptionName
-     * @param id 
-     * @return the next messageId or null
-     * @throws IOException 
-     * @throws Exception 
-     */
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
-    
-    
-    /**
-     * Get the previous  messageId to deliver to a subscriber before the MessageId provided
-     * @param clientId
-     * @param subscriptionName
-     * @param id 
-     * @return the next messageId or null
-     * @throws IOException 
-     * @throws Exception 
-     */
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
+    public void resetBatching(String clientId,String subscriptionName);
     
     
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Fri Nov 17 02:33:57 2006
@@ -89,4 +89,9 @@
 
     public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
             String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-}
+    
+    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+    
+    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
+            JDBCMessageRecoveryListener listener) throws Exception;
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 17 02:33:57 2006
@@ -19,7 +19,7 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
-
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -33,6 +33,7 @@
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
+
 /**
  * @version $Revision: 1.10 $
  */
@@ -42,6 +43,7 @@
     protected final ActiveMQDestination destination;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
+    protected AtomicLong lastMessageId = new AtomicLong(-1);
 
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
             ActiveMQDestination destination) {
@@ -201,4 +203,67 @@
     public void setUsageManager(UsageManager usageManager) {
         // we can ignore since we don't buffer up messages.
     }
+
+  
+    public int getMessageCount() throws IOException{
+        int result = 0;
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            
+            result = adapter.doGetMessageCount(c, destination);
+               
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
+
+    /**
+     * @param maxReturned
+     * @param listener
+     * @throws Exception
+     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
+     */
+    public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception{
+        TransactionContext c=persistenceAdapter.getTransactionContext();
+        
+        try{
+            adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
+                    new JDBCMessageRecoveryListener(){
+
+                        public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+                            Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+                            msg.getMessageId().setBrokerSequenceId(sequenceId);
+                            listener.recoverMessage(msg);
+                            lastMessageId.set(sequenceId);
+                        }
+
+                        public void recoverMessageReference(String reference) throws Exception{
+                            listener.recoverMessageReference(reference);
+                        }
+
+                        public void finished(){
+                            listener.finished();
+                        }
+                    });
+        }catch(SQLException e){
+            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+        }finally{
+            c.close();
+        }
+        
+    }
+
+    /**
+     * 
+     * @see org.apache.activemq.store.MessageStore#resetBatching()
+     */
+    public void resetBatching(){
+        lastMessageId.set(-1);
+        
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -19,7 +19,9 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
-
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
@@ -30,13 +32,14 @@
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import java.util.concurrent.atomic.AtomicBoolean;
+
 
 /**
  * @version $Revision: 1.6 $
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
+    private Map subscriberLastMessageMap=new ConcurrentHashMap();
     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
             ActiveMQTopic topic) {
         super(persistenceAdapter, adapter, wireFormat, topic);
@@ -90,35 +93,46 @@
         }
     }
 
-    public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
-        TransactionContext c = persistenceAdapter.getTransactionContext();
-        try {
-            long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
-            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
-                    new JDBCMessageRecoveryListener() {
-                        public void recoverMessage(long sequenceId, byte[] data) throws Exception {
-                            Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+    public synchronized void recoverNextMessages(final String clientId,final String subscriptionName,
+            final int maxReturned,final MessageRecoveryListener listener) throws Exception{
+        TransactionContext c=persistenceAdapter.getTransactionContext();
+        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+        AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
+        if(last==null){
+            last=new AtomicLong(-1);
+            subscriberLastMessageMap.put(subcriberId,last);
+        }
+        final AtomicLong finalLast=last;
+        try{
+            adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
+                    new JDBCMessageRecoveryListener(){
+
+                        public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+                            Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
                             msg.getMessageId().setBrokerSequenceId(sequenceId);
                             listener.recoverMessage(msg);
+                            finalLast.set(sequenceId);
                         }
-                        public void recoverMessageReference(String reference) throws Exception {
+
+                        public void recoverMessageReference(String reference) throws Exception{
                             listener.recoverMessageReference(reference);
                         }
-                        
+
                         public void finished(){
                             listener.finished();
                         }
                     });
-        } catch (SQLException e) {
+        }catch(SQLException e){
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-
-        } finally {
+        }finally{
             c.close();
+            last.set(finalLast.get());
         }
-        
     }
     
-    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+    public void resetBatching(String clientId,String subscriptionName) {
+        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+        subscriberLastMessageMap.remove(subcriberId);
     }
     
     /**
@@ -165,6 +179,7 @@
             throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
+            resetBatching(clientId,subscriptionName);
         }
     }
 
@@ -180,76 +195,9 @@
         }
     }
 
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        
-        final MessageId result = new MessageId();
-        final AtomicBoolean initalized = new AtomicBoolean();
-        TransactionContext c = persistenceAdapter.getTransactionContext();
-        try {
-            long sequence = id != null ? id.getBrokerSequenceId() : -1;
-           adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
-               public void recoverMessage(long sequenceId, byte[] data) throws Exception {
-                   Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
-                   msg.getMessageId().setBrokerSequenceId(sequenceId);
-                   result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
-                   initalized.set(true);
-                   
-               }
-               public void recoverMessageReference(String reference) throws Exception {
-                   result.setValue(reference);
-                   initalized.set(true);
-                   
-               }
-               
-               public void finished(){          
-               }
-           });
-           
-               
-        } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
-        } finally {
-            c.close();
-        }
-        return initalized.get () ? result : null;
-    }
     
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        final MessageId result = new MessageId();
-        final AtomicBoolean initalized = new AtomicBoolean();
-        TransactionContext c = persistenceAdapter.getTransactionContext();
-        try {
-            long sequence = id != null ? id.getBrokerSequenceId() : -1;
-           adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
-               public void recoverMessage(long sequenceId, byte[] data) throws Exception {
-                   Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
-                   msg.getMessageId().setBrokerSequenceId(sequenceId);
-                   result.setProducerId(msg.getMessageId().getProducerId());
-                   result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
-                   result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
-                   initalized.set(true);
-                   
-               }
-               public void recoverMessageReference(String reference) throws Exception {
-                   result.setValue(reference);
-                   initalized.set(true);
-                   
-               }
-               
-               public void finished(){          
-               }
-           });
-           
-               
-        } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
-        } finally {
-            c.close();
-        }
-        return initalized.get () ? result : null;
-    }
+    
+    
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         int result = 0;
@@ -263,6 +211,12 @@
         } finally {
             c.close();
         }
+        return result;
+    }
+    
+    protected String getSubscriptionKey(String clientId,String subscriberName){
+        String result=clientId+":";
+        result+=subscriberName!=null?subscriberName:"NOT_SET";
         return result;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Fri Nov 17 02:33:57 2006
@@ -66,6 +66,8 @@
     private String durableSubscriberMessageCountStatement;
     private String nextDurableSubscriberMessageIdStatement;
     private String prevDurableSubscriberMessageIdStatement;
+    private String destinationMessageCountStatement;
+    private String findNextMessagesStatement;
     private boolean useLockCreateWhereClause;
 
     public String[] getCreateSchemaStatements() {
@@ -338,6 +340,29 @@
         }
         return lockUpdateStatement;
     }
+    
+    /**
+     * @return the destinationMessageCountStatement
+     */
+    public String getDestinationMessageCountStatement(){
+        if (destinationMessageCountStatement==null) {
+            destinationMessageCountStatement= "SELECT COUNT(*) FROM " + getFullMessageTableName()
+            + " WHERE CONTAINER=?";
+        }
+        return destinationMessageCountStatement;
+    }
+
+    /**
+     * @return the findNextMessagesStatement
+     */
+    public String getFindNextMessagesStatement(){
+        if(findNextMessagesStatement == null) {            
+            findNextMessagesStatement="SELECT ID, MSG FROM " + getFullMessageTableName()
+            + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+        }
+        return findNextMessagesStatement;
+    }
+
 
     public String getFullMessageTableName() {
         return getTablePrefix() + getMessageTableName();
@@ -627,4 +652,22 @@
     public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
         this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
     }
-}
+    
+    /**
+     * @param findNextMessagesStatement the findNextMessagesStatement to set
+     */
+    public void setFindNextMessagesStatement(String findNextMessagesStatement){
+        this.findNextMessagesStatement=findNextMessagesStatement;
+    }
+
+    /**
+     * @param destinationMessageCountStatement the destinationMessageCountStatement to set
+     */
+    public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
+        this.destinationMessageCountStatement=destinationMessageCountStatement;
+    }
+    
+    
+    
+   
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 17 02:33:57 2006
@@ -677,6 +677,54 @@
             close(s);
         }
     }
+    
+   
+    public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        int result=0;
+        try{
+            s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
+            s.setString(1,destination.getQualifiedName());
+            rs=s.executeQuery();
+            if(rs.next()){
+                result=rs.getInt(1);
+            }
+        }finally{
+            close(rs);
+            close(s);
+        }
+        return result;
+    }
+
+    
+    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setLong(4,nextSeq);
+            rs=s.executeQuery();
+            int count=0;
+            if(statements.isUseExternalMessageReferences()){
+                while(rs.next()&&count<maxReturned){
+                    listener.recoverMessageReference(rs.getString(1));
+                    count++;
+                }
+            }else{
+                while(rs.next()&&count<maxReturned){
+                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+                    count++;
+                }
+            }
+        }finally{
+            close(rs);
+            close(s);
+            listener.finished();
+        }
+        
+    }
     /*
      * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
      * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
@@ -700,4 +748,6 @@
      * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
      * try { s.close(); } catch (Throwable ignore) {} } }
      */
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Fri Nov 17 02:33:57 2006
@@ -381,4 +381,27 @@
         throw new IOException("The journal does not support message references.");
     }
 
-}
+    /**
+     * @return
+     * @throws IOException 
+     * @see org.apache.activemq.store.MessageStore#getMessageCount()
+     */
+    public int getMessageCount() throws IOException{
+        peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageCount();
+    }
+
+   
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        peristenceAdapter.checkpoint(true, true);
+        longTermStore.recoverNextMessages(maxReturned,listener);
+        
+    }
+
+    
+    public void resetBatching(){
+        longTermStore.resetBatching();
+        
+    }
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -58,9 +58,9 @@
         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
     }
     
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
         this.peristenceAdapter.checkpoint(true, true);
-        longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+        longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
         
     }
 
@@ -190,25 +190,16 @@
         return longTermStore.getAllSubscriptions();
     }
 
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
-    }
     
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
-    }
-
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId,subscriberName);
     }
     
-    public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) {
-        longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch);
+    public void resetBatching(String clientId,String subscriptionName) {
+        longTermStore.resetBatching(clientId,subscriptionName);
     }
 
     
 
-}
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Fri Nov 17 02:33:57 2006
@@ -415,4 +415,23 @@
         throw new IOException("The journal does not support message references.");
     }
 
-}
+   
+    public int getMessageCount() throws IOException{
+        peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageCount();
+    }
+
+    
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        peristenceAdapter.checkpoint(true, true);
+        longTermStore.recoverNextMessages(maxReturned,listener);
+        
+    }
+
+    
+    public void resetBatching(){
+       longTermStore.resetBatching();
+        
+    }
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -72,9 +72,9 @@
 
     }
     
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
+    public void recoverNextMessages(String clientId,String subscriptionName, int maxReturned,final MessageRecoveryListener listener) throws Exception{
         this.peristenceAdapter.checkpoint(true, true);
-        longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
+        longTermStore.recoverNextMessages(clientId, subscriptionName,maxReturned,new MessageRecoveryListener() {
             public void recoverMessage(Message message) throws Exception {
                 throw new IOException("Should not get called.");
             }
@@ -217,26 +217,16 @@
         return longTermStore.getAllSubscriptions();
     }
     
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
-    }
-    
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-        this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
-    }
-
-    
+       
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId,subscriberName);
     }
     
-    public void resetBatching(String clientId,String subscriptionName,MessageId nextId) {
-        longTermStore.resetBatching(clientId,subscriptionName,nextId);
+    public void resetBatching(String clientId,String subscriptionName) {
+        longTermStore.resetBatching(clientId,subscriptionName);
     }
 
    
 
-}
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Nov 17 02:33:57 2006
@@ -26,6 +26,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.memory.UsageListener;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -35,10 +36,12 @@
  * 
  * @version $Revision: 1.7 $
  */
-public class KahaMessageStore implements MessageStore{
+public class KahaMessageStore implements MessageStore, UsageListener{
     protected final ActiveMQDestination destination;
     protected final ListContainer messageContainer;
+    protected StoreEntry batchEntry = null;
     protected final LRUCache cache;
+    protected UsageManager usageManager;
 
     public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
         this.messageContainer=container;
@@ -73,19 +76,19 @@
 
     public synchronized Message getMessage(MessageId identity) throws IOException{
         Message result=null;
-        StoreEntry entry=(StoreEntry)cache.remove(identity);
+        StoreEntry entry=(StoreEntry)cache.get(identity);
         if(entry!=null){
             result = (Message)messageContainer.get(entry);
-        }else{
-       
-        for(Iterator i=messageContainer.iterator();i.hasNext();){
-            Message msg=(Message)i.next();
-            if(msg.getMessageId().equals(identity)){
-                result=msg;
-                break;
+        }else{    
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+                Message msg=(Message)messageContainer.get(entry);
+                if(msg.getMessageId().equals(identity)){
+                    result=msg;
+                    cache.put(identity,msg);
+                    break;
+                }
             }
         }
-        }
         return result;
     }
 
@@ -102,10 +105,10 @@
         if(entry!=null){
             messageContainer.remove(entry);
         }else{
-            for(Iterator i=messageContainer.iterator();i.hasNext();){
-                Message msg=(Message)i.next();
+            for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+                Message msg=(Message)messageContainer.get(entry);
                 if(msg.getMessageId().equals(msgId)){
-                    i.remove();
+                    messageContainer.remove(entry);
                     break;
                 }
             }
@@ -119,9 +122,15 @@
         listener.finished();
     }
 
-    public void start() {}
+    public void start() {
+        if( this.usageManager != null )
+            this.usageManager.addUsageListener(this);
+    }
 
-    public void stop() {}
+    public void stop() {
+        if( this.usageManager != null )
+            this.usageManager.removeUsageListener(this);
+    }
 
     public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
         messageContainer.clear();
@@ -141,6 +150,91 @@
      * @param usageManager The UsageManager that is controlling the destination's memory usage.
      */
     public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+    }
+
+    /**
+     * @return the number of messages held by this destination
+     * @see org.apache.activemq.store.MessageStore#getMessageCount()
+     */
+    public int getMessageCount(){
+       return messageContainer.size();
+    }
+
+    /**
+     * @param id
+     * @return null
+     * @throws Exception
+     * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
+     */
+    public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception{
+        return null;
+    }
+
+    /**
+     * @param lastMessageId
+     * @param maxReturned
+     * @param listener
+     * @throws Exception
+     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
+     */
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+        StoreEntry entry = batchEntry;
+        if (entry == null) {
+            entry= messageContainer.getFirst();
+        }else {
+            entry=messageContainer.refresh(entry);
+            entry=messageContainer.getNext(entry);
+        }
+        if(entry!=null){
+            int count = 0;
+            do{
+                Object msg=messageContainer.get(entry);
+                if(msg!=null){
+                    if(msg.getClass()==String.class){
+                        String ref=msg.toString();
+                        listener.recoverMessageReference(ref);
+                    }else{
+                        Message message=(Message)msg;
+                        listener.recoverMessage(message);
+                    }
+                    count++;
+                }
+                batchEntry = entry;
+                entry=messageContainer.getNext(entry);
+            }while(entry!=null&&count<maxReturned);
+        }
+        listener.finished();
+        
+    }
+
+    /**
+     * @param nextToDispatch
+     * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
+     */
+    public void resetBatching(){
+        batchEntry = null;
+        
+    }
+    
+    /**
+     * @return true if the store supports cursors
+     */
+    public boolean isSupportForCursors() {
+        return true;
+    }
+
+    /**
+     * @param memoryManager
+     * @param oldPercentUsage
+     * @param newPercentUsage
+     * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
+     */
+    public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        if (newPercentUsage == 100) {
+            cache.clear();
+        }
+        
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Nov 17 02:33:57 2006
@@ -19,6 +19,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +38,7 @@
 import org.apache.activemq.store.TransactionStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
+
 
 /**
  * @org.apache.xbean.XBean
@@ -106,7 +107,7 @@
             MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
             ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
+            rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize);
             messageStores.put(destination,rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -31,24 +32,20 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @version $Revision: 1.5 $
  */
-public class KahaTopicMessageStore implements TopicMessageStore{
+public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
 
-    private ActiveMQDestination destination;
     private ListContainer ackContainer;
-    private ListContainer messageContainer;
     private Map subscriberContainer;
     private Store store;
     private Map subscriberMessages=new ConcurrentHashMap();
 
     public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
-            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        this.messageContainer=messageContainer;
-        this.destination=destination;
+            MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
+        super(messageContainer,destination,maximumCacheSize);
         this.store=store;
         this.ackContainer=ackContainer;
         subscriberContainer=subsContainer;
@@ -159,7 +156,7 @@
         }
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
             MessageRecoveryListener listener) throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
@@ -195,7 +192,7 @@
     }
 
     public void delete(){
-        messageContainer.clear();
+        super.delete();
         ackContainer.clear();
         subscriberContainer.clear();
     }
@@ -322,82 +319,16 @@
         }
     }
 
-    /**
-     * @param usageManager
-     * @see org.apache.activemq.store.MessageStore#setUsageManager(org.apache.activemq.memory.UsageManager)
-     */
-    public void setUsageManager(UsageManager usageManager){
-        // TODO Auto-generated method stub
-    }
-
-    /**
-     * @throws Exception
-     * @see org.apache.activemq.Service#start()
-     */
-    public void start() throws Exception{
-        // TODO Auto-generated method stub
-    }
-
-    /**
-     * @throws Exception
-     * @see org.apache.activemq.Service#stop()
-     */
-    public void stop() throws Exception{
-        // TODO Auto-generated method stub
-    }
-
-    /**
-     * @param clientId
-     * @param subscriptionName
-     * @see org.apache.activemq.store.TopicMessageStore#resetBatching(java.lang.String, java.lang.String)
-     */
-    public synchronized void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch){
+    public synchronized void resetBatching(String clientId,String subscriptionName){
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
         if(topicSubContainer!=null){
             topicSubContainer.reset();
-            if(nextToDispatch!=null){
-                StoreEntry entry=topicSubContainer.getListContainer().getFirst();
-                do{
-                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)topicSubContainer.getListContainer().get(entry);
-                    Object msg=messageContainer.get(consumerRef.getMessageEntry());
-                    if(msg!=null){
-                        if(msg.getClass()==String.class){
-                            String ref=msg.toString();
-                            if(msg.toString().equals(nextToDispatch.toString())){
-                                // need to set the entry to the previous one
-                                // to ensure we start in the right place
-                                topicSubContainer
-                                        .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
-                                break;
-                            }
-                        }else{
-                            Message message=(Message)msg;
-                            if(message!=null&&message.getMessageId().equals(nextToDispatch)){
-                                // need to set the entry to the previous one
-                                // to ensure we start in the right place
-                                topicSubContainer
-                                        .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
-                                break;
-                            }
-                        }
-                    }
-                    entry=topicSubContainer.getListContainer().getNext(entry);
-                }while(entry!=null);
-            }
         }
     }
 
-    /**
-     * @param clientId
-     * @param subscriptionName
-     * @param id
-     * @return next messageId
-     * @throws IOException
-     * @see org.apache.activemq.store.TopicMessageStore#getNextMessageIdToDeliver(java.lang.String, java.lang.String,
-     *      org.apache.activemq.command.MessageId)
-     */
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
+   
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{
         // TODO Auto-generated method stub
         return null;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.memory;
 
 import java.io.IOException;
@@ -22,7 +19,6 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -34,82 +30,97 @@
 
 /**
  * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
- *
+ * 
  * @version $Revision: 1.7 $
  */
-public class MemoryMessageStore implements MessageStore {
+public class MemoryMessageStore implements MessageStore{
 
     protected final ActiveMQDestination destination;
     protected final Map messageTable;
 
-    public MemoryMessageStore(ActiveMQDestination destination) {
-        this(destination, new LinkedHashMap());
+    public MemoryMessageStore(ActiveMQDestination destination){
+        this(destination,new LinkedHashMap());
     }
 
-    public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
-        this.destination = destination;
-        this.messageTable = Collections.synchronizedMap(messageTable);
+    public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){
+        this.destination=destination;
+        this.messageTable=Collections.synchronizedMap(messageTable);
     }
 
-    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
-        messageTable.put(message.getMessageId(), message);
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        messageTable.put(message.getMessageId(),message);
     }
-    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
-        messageTable.put(messageId, messageRef);
+
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+            throws IOException{
+        messageTable.put(messageId,messageRef);
     }
 
-    public Message getMessage(MessageId identity) throws IOException {
-        return (Message) messageTable.get(identity);
+    public Message getMessage(MessageId identity) throws IOException{
+        return (Message)messageTable.get(identity);
     }
-    public String getMessageReference(MessageId identity) throws IOException {
-        return (String) messageTable.get(identity);
+
+    public String getMessageReference(MessageId identity) throws IOException{
+        return (String)messageTable.get(identity);
     }
 
-    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
         messageTable.remove(ack.getLastMessageId());
     }
-    
-    public void removeMessage(MessageId msgId) throws IOException {
+
+    public void removeMessage(MessageId msgId) throws IOException{
         messageTable.remove(msgId);
     }
 
-    public void recover(MessageRecoveryListener listener) throws Exception {
+    public void recover(MessageRecoveryListener listener) throws Exception{
         // the message table is a synchronizedMap - so just have to synchronize here
         synchronized(messageTable){
             for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
-                Object msg=(Object) iter.next();
+                Object msg=(Object)iter.next();
                 if(msg.getClass()==String.class){
-                    listener.recoverMessageReference((String) msg);
+                    listener.recoverMessageReference((String)msg);
                 }else{
-                    listener.recoverMessage((Message) msg);
+                    listener.recoverMessage((Message)msg);
                 }
             }
             listener.finished();
         }
     }
 
-    public void start() {
+    public void start(){
     }
 
-    public void stop() {
+    public void stop(){
     }
 
-    public void removeAllMessages(ConnectionContext context) throws IOException {
+    public void removeAllMessages(ConnectionContext context) throws IOException{
         messageTable.clear();
     }
 
-    public ActiveMQDestination getDestination() {
+    public ActiveMQDestination getDestination(){
         return destination;
     }
 
-    public void delete() {
+    public void delete(){
         messageTable.clear();
     }
-    
+
     /**
      * @param usageManager The UsageManager that is controlling the destination's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
+    public void setUsageManager(UsageManager usageManager){
     }
 
+    public int getMessageCount(){
+        return messageTable.size();
+    }
+
+    public void resetBatching(MessageId nextToDispatch){
+    }
+
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    }
+
+    public void resetBatching(){
+    }
 }