You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/24 20:57:52 UTC

svn commit: r478967 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/ main/...

Author: rajdavies
Date: Fri Nov 24 11:57:51 2006
New Revision: 478967

URL: http://svn.apache.org/viewvc?view=rev&rev=478967
Log:
implementation of store based cursors for Queues and Durable Subscribers,
to fix:
http://issues.apache.org/activemq/browse/AMQ-845
http://issues.apache.org/activemq/browse/AMQ-1062
http://issues.apache.org/activemq/browse/AMQ-1061
http://issues.apache.org/activemq/browse/AMQ-493
http://issues.apache.org/activemq/browse/AMQ-914

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Nov 24 11:57:51 2006
@@ -427,6 +427,10 @@
                 public void recoverMessageReference(String messageReference) throws Exception{}
 
                 public void finished(){}
+
+                public boolean hasSpace(){
+                    return true;
+                }
             });
         }catch(Throwable e){
             log.error("Failed to browse messages for Subscription "+view,e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Nov 24 11:57:51 2006
@@ -117,6 +117,7 @@
     public void initialize() throws Exception{
         if(store!=null){
             // Restore the persistent messages.
+            messages.setUsageManager(getUsageManager());
             messages.start();
             if(messages.isRecoveryRequired()){
                 store.recover(new MessageRecoveryListener(){
@@ -145,6 +146,10 @@
 
                     public void finished(){
                     }
+
+                    public boolean hasSpace(){
+                        return true;
+                    }
                 });
             }
         }
@@ -242,6 +247,9 @@
 
             synchronized (consumers) {
                 consumers.remove(sub);
+                if (consumers.isEmpty()) {
+                    messages.gc();
+                }
             }
             sub.remove(context, this);
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Nov 24 11:57:51 2006
@@ -203,6 +203,10 @@
                     }
 
                     public void finished(){}
+
+                    public boolean hasSpace(){
+                        return true;
+                    }
                 });
             }
             
@@ -334,6 +338,10 @@
                     public void recoverMessageReference(String messageReference) throws Exception{}
 
                     public void finished(){}
+
+                    public boolean hasSpace(){
+                       return true;
+                    }
                 });
                 Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                 if(msgs!=null){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Nov 24 11:57:51 2006
@@ -219,6 +219,7 @@
                 PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
                         context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
                         info.getPrefetchSize());
+                cursor.setUsageManager(memoryManager);
                 sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
                 durableSubscriptions.put(key,sub);
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Nov 24 11:57:51 2006
@@ -14,10 +14,10 @@
 
 package org.apache.activemq.broker.region.cursors;
 
-import java.io.IOException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.memory.UsageManager;
 
 /**
  * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor
@@ -27,11 +27,13 @@
 public class AbstractPendingMessageCursor implements PendingMessageCursor{
 
     protected int maxBatchSize=100;
+    protected UsageManager usageManager;
 
     public void start() throws Exception{
     }
 
     public void stop() throws Exception{
+        gc();
     }
 
     public void add(ConnectionContext context,Destination destination) throws Exception{
@@ -86,17 +88,22 @@
     protected void fillBatch() throws Exception{
     }
 
-    /**
-     * Give the cursor a hint that we are about to remove messages from memory only
-     */
     public void resetForGC(){
         reset();
     }
 
-    /**
-     * @param node
-     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
-     */
     public void remove(MessageReference node){
+    }
+    
+    public void gc(){
+    }
+
+   
+    public void setUsageManager(UsageManager usageManager){
+       this.usageManager = usageManager; 
+    }
+    
+    public boolean hasSpace() {
+        return usageManager != null ? !usageManager.isFull() : true;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Nov 24 11:57:51 2006
@@ -19,6 +19,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.memory.UsageManager;
 
 /**
  * Interface to pending message (messages awaiting disptach to a consumer) cursor
@@ -125,4 +126,18 @@
      * @param node
      */
     public void remove(MessageReference node);
+    
+    
+    /**
+     * free up any internal buffers
+     *
+     */
+    public void gc();
+    
+    /**
+     * Set the UsageManager
+     * @param usageManager
+     * @see org.apache.activemq.memory.UsageManager
+     */
+    public void setUsageManager(UsageManager usageManager);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri Nov 24 11:57:51 2006
@@ -20,16 +20,12 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
-import javax.jms.JMSException;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -65,6 +61,7 @@
 
     public void stop() throws Exception{
         store.resetBatching();
+        gc();
     }
 
     /**
@@ -124,6 +121,10 @@
             throws Exception{
         // shouldn't get called
         throw new RuntimeException("Not supported");
+    }
+    
+    public void gc() {
+        batchList.clear();
     }
 
     // implementation

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Nov 24 11:57:51 2006
@@ -25,6 +25,7 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -86,6 +87,7 @@
     public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
         TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
         tsp.setMaxBatchSize(getMaxBatchSize());
+        tsp.setUsageManager(usageManager);
         topics.put(destination,tsp);
         storePrefetches.add(tsp);
         if(started){
@@ -200,6 +202,21 @@
             tsp.setMaxBatchSize(maxBatchSize);
         }
         super.setMaxBatchSize(maxBatchSize);
+    }
+    
+    public synchronized void gc() {
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
+            tsp.gc();
+        }
+    }
+    
+    public synchronized void setUsageManager(UsageManager usageManager){
+        super.setUsageManager(usageManager);
+        for(Iterator i=storePrefetches.iterator();i.hasNext();){
+            PendingMessageCursor tsp=(PendingMessageCursor)i.next();
+            tsp.setUsageManager(usageManager);
+        }
     }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Nov 24 11:57:51 2006
@@ -14,11 +14,11 @@
 
 package org.apache.activemq.broker.region.cursors;
 
-import java.util.Iterator;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -55,6 +55,7 @@
         if(nonPersistent==null){
             nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
+            nonPersistent.setUsageManager(usageManager);
         }
         nonPersistent.start();
         persistent.start();
@@ -65,8 +66,10 @@
         started=false;
         if(nonPersistent!=null){
             nonPersistent.stop();
+            nonPersistent.gc();
         }
         persistent.stop();
+        persistent.gc();
         pendingCount=0;
     }
 
@@ -162,10 +165,29 @@
         }
         super.setMaxBatchSize(maxBatchSize);
     }
+    
+    public void gc() {
+        if (persistent != null) {
+            persistent.gc();
+        }
+        if (nonPersistent != null) {
+            nonPersistent.gc();
+        }
+    }
+    
+    public void setUsageManager(UsageManager usageManager){
+        super.setUsageManager(usageManager);
+        if (persistent != null) {
+            persistent.setUsageManager(usageManager);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setUsageManager(usageManager);
+        }
+     }
 
     protected synchronized PendingMessageCursor getNextCursor() throws Exception{
         if(currentCursor==null||currentCursor.isEmpty()){
-            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
+            currentCursor=currentCursor==persistent?nonPersistent:persistent;
         }
         return currentCursor;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Fri Nov 24 11:57:51 2006
@@ -66,6 +66,7 @@
 
     public void stop() throws Exception{
         store.resetBatching(clientId,subscriberName);
+        gc();
     }
 
     /**
@@ -136,6 +137,10 @@
             Message message=(Message)batchList.getLast();
           
         }
+    }
+    
+    public void gc() {
+        batchList.clear();
     }
     
     public String toString() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Fri Nov 24 11:57:51 2006
@@ -26,4 +26,5 @@
     void recoverMessage(Message message) throws Exception;
     void recoverMessageReference(String messageReference) throws Exception;
     void finished();
+    boolean hasSpace();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 24 11:57:51 2006
@@ -235,14 +235,18 @@
                     new JDBCMessageRecoveryListener(){
 
                         public void recoverMessage(long sequenceId,byte[] data) throws Exception{
-                            Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
-                            msg.getMessageId().setBrokerSequenceId(sequenceId);
-                            listener.recoverMessage(msg);
-                            lastMessageId.set(sequenceId);
+                            if(listener.hasSpace()){
+                                Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+                                msg.getMessageId().setBrokerSequenceId(sequenceId);
+                                listener.recoverMessage(msg);
+                                lastMessageId.set(sequenceId);
+                            }
                         }
 
                         public void recoverMessageReference(String reference) throws Exception{
-                            listener.recoverMessageReference(reference);
+                            if(listener.hasSpace()) {
+                                listener.recoverMessageReference(reference);
+                            }
                         }
 
                         public void finished(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -108,10 +108,12 @@
                     new JDBCMessageRecoveryListener(){
 
                         public void recoverMessage(long sequenceId,byte[] data) throws Exception{
-                            Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
-                            msg.getMessageId().setBrokerSequenceId(sequenceId);
-                            listener.recoverMessage(msg);
-                            finalLast.set(sequenceId);
+                            if(listener.hasSpace()){
+                                Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+                                msg.getMessageId().setBrokerSequenceId(sequenceId);
+                                listener.recoverMessage(msg);
+                                finalLast.set(sequenceId);
+                            }
                         }
 
                         public void recoverMessageReference(String reference) throws Exception{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 24 11:57:51 2006
@@ -370,6 +370,7 @@
         ResultSet rs=null;
         try{
             s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+            s.setMaxRows(maxReturned);
             s.setString(1,destination.getQualifiedName());
             s.setString(2,clientId);
             s.setString(3,subscriptionName);
@@ -639,8 +640,9 @@
         ResultSet rs=null;
         try{
             s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
+            s.setMaxRows(maxReturned);
             s.setString(1,destination.getQualifiedName());
-            s.setLong(4,nextSeq);
+            s.setLong(2,nextSeq);
             rs=s.executeQuery();
             int count=0;
             if(statements.isUseExternalMessageReferences()){
@@ -654,7 +656,9 @@
                     count++;
                 }
             }
-        }finally{
+        }catch(Exception e) {
+            e.printStackTrace();
+        }finally {
             close(rs);
             close(s);
             listener.finished();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Fri Nov 24 11:57:51 2006
@@ -373,6 +373,10 @@
             public void finished(){
                 listener.finished();
             }
+            public boolean hasSpace(){
+                // TODO Auto-generated method stub
+                return true;
+            }
         });
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -68,6 +68,9 @@
             public void finished(){
                 listener.finished();
             }
+            public boolean hasSpace(){
+                return true;
+            }
         });
 
     }
@@ -86,6 +89,9 @@
             
             public void finished(){
                 listener.finished();
+            }
+            public boolean hasSpace(){
+                return true;
             }
         });
         

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Nov 24 11:57:51 2006
@@ -210,7 +210,7 @@
                 }
                 batchEntry = entry;
                 entry=messageContainer.getNext(entry);
-            }while(entry!=null&&count<maxReturned);
+            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
         }
         listener.finished();
         

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -184,7 +184,7 @@
                     }
                     container.setBatchEntry(entry);
                     entry=container.getListContainer().getNext(entry);
-                }while(entry!=null&&count<maxReturned);
+                }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }
         }
         listener.finished();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 24 11:57:51 2006
@@ -350,7 +350,7 @@
                 }
                 batchEntry=entry;
                 entry=messageContainer.getNext(entry);
-            }while(entry!=null&&count<maxReturned);
+            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
         }
         listener.finished();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -186,7 +186,7 @@
                     }
                     container.setBatchEntry(entry);
                     entry=container.getListContainer().getNext(entry);
-                }while(entry!=null&&count<maxReturned);
+                }while(entry!=null&&count<maxReturned && listener.hasSpace());
             }
         }
         listener.finished();

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java Fri Nov 24 11:57:51 2006
@@ -28,7 +28,7 @@
     public void testManyProducersManyConsumers() throws Exception {
         consumerCount = 20;
         producerCount = 20;
-        messageCount  = 500;
+        messageCount  = 50;
         messageSize   = 1; 
         prefetchCount = 10;
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Fri Nov 24 11:57:51 2006
@@ -1,220 +1,56 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region.cursors;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
 import javax.jms.Topic;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
 /**
  * @version $Revision: 1.3 $
  */
-public class CursorDurableTest extends TestCase{
-    
-    protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
-
-    protected static final int MESSAGE_COUNT=100;
-    protected static final int PREFETCH_SIZE = 5;
-    protected BrokerService broker;
-    protected String bindAddress="tcp://localhost:60706";
-    protected int topicCount=0;
-
-    public void testSendFirstThenConsume() throws Exception{
-        ConnectionFactory factory=createConnectionFactory();
-        Connection consumerConnection= getConsumerConnection(factory);
-        //create durable subs
-        MessageConsumer consumer = getConsumer(consumerConnection);
-        consumerConnection.close();
-        
-        Connection producerConnection = factory.createConnection();
-        producerConnection.start();
-        Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(getTopic(session));
-        List senderList = new ArrayList();
-        for (int i =0; i < MESSAGE_COUNT; i++) {
-            Message msg=session.createTextMessage("test"+i);
-            senderList.add(msg);
-            producer.send(msg);
-        }
-        producerConnection.close();
-        
-        //now consume the messages
-        consumerConnection= getConsumerConnection(factory);
-        //create durable subs
-        consumer = getConsumer(consumerConnection);
-        List consumerList = new ArrayList();
-        for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message msg = consumer.receive();
-            consumerList.add(msg);
-        }
-        assertEquals(senderList,consumerList);
-        consumerConnection.close();       
-    }
-    
-    public void testSendWhilstConsume() throws Exception{
-        ConnectionFactory factory=createConnectionFactory();
-        Connection consumerConnection= getConsumerConnection(factory);
-        //create durable subs
-        MessageConsumer consumer = getConsumer(consumerConnection);
-        consumerConnection.close();
-        
-        Connection producerConnection = factory.createConnection();
-        producerConnection.start();
-        Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(getTopic(session));
-        List senderList = new ArrayList();
-        for (int i =0; i < MESSAGE_COUNT/10; i++) {
-            TextMessage msg=session.createTextMessage("test"+i);
-            senderList.add(msg);
-            producer.send(msg);
-        }
-        
-        
-        //now consume the messages
-        consumerConnection= getConsumerConnection(factory);
-        //create durable subs
-        consumer = getConsumer(consumerConnection);
-        final List consumerList = new ArrayList();
-        
-        final CountDownLatch latch = new CountDownLatch(1);
-        consumer.setMessageListener(new MessageListener() {
-
-            public void onMessage(Message msg){
-                try{
-                    //sleep to act as a slow consumer
-                    //which will force a mix of direct and polled dispatching
-                    //using the cursor on the broker
-                    Thread.sleep(50);
-                }catch(Exception e){
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-                consumerList.add(msg);
-                if (consumerList.size()==MESSAGE_COUNT) {
-                    latch.countDown();
-                }
-                
-            }
-            
-        });
-        for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
-            TextMessage msg=session.createTextMessage("test"+i);
-            senderList.add(msg);
-           
-            producer.send(msg);
-            
-           
-        }   
-        
-        
-        latch.await(300000,TimeUnit.MILLISECONDS);
-        assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0);
-        assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size());
-        assertEquals(senderList,consumerList);
-        producerConnection.close();
-        consumerConnection.close();       
-    }
-    
-    
+public class CursorDurableTest extends CursorSupport{
 
-    protected Topic getTopic(Session session) throws JMSException{
+    protected Destination getDestination(Session session) throws JMSException{
         String topicName=getClass().getName();
         return session.createTopic(topicName);
     }
-    
+
     protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
         Connection connection=fac.createConnection();
         connection.setClientID("testConsumer");
         connection.start();
         return connection;
-        
     }
-    
+
     protected MessageConsumer getConsumer(Connection connection) throws Exception{
-        Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        Topic topic = getTopic(consumerSession);
-        MessageConsumer  consumer = consumerSession.createDurableSubscriber(topic,"testConsumer");
+        Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Topic topic=(Topic)getDestination(consumerSession);
+        MessageConsumer consumer=consumerSession.createDurableSubscriber(topic,"testConsumer");
         return consumer;
     }
-
-    
-
-    protected void setUp() throws Exception{
-        if(broker==null){
-            broker=createBroker();
-        }
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception{
-        super.tearDown();
-        
-        if(broker!=null){
-          broker.stop();
-        }
-    }
-
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
-        ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
-        Properties props = new Properties();
-        props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE);
-        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE);
-        cf.setProperties(props);
-        return cf;
-    }
     
-   
-
-    protected BrokerService createBroker() throws Exception{
-        BrokerService answer=new BrokerService();
-        configureBroker(answer);
+    protected void configureBroker(BrokerService answer) throws Exception{
         answer.setDeleteAllMessagesOnStartup(true);
         answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
-        answer.start();
-        return answer;
-    }
-
-    protected void configureBroker(BrokerService answer) throws Exception{
-        
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
     }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,64 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public  class CursorQueueStoreTest extends CursorSupport{
+
+    protected Destination getDestination(Session session) throws JMSException{
+        String queueName="QUEUE" + getClass().getName();
+        return session.createQueue(queueName);
+    }
+
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+        Connection connection=fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+    }
+
+    protected MessageConsumer getConsumer(Connection connection) throws Exception{
+        Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Destination dest = getDestination(consumerSession);
+        MessageConsumer consumer=consumerSession.createConsumer(dest);
+        return consumer;
+    }
+    
+    
+    protected void configureBroker(BrokerService answer) throws Exception{
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(pMap);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,175 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public abstract class CursorSupport extends TestCase{
+
+    protected static final Log log=LogFactory.getLog(CursorSupport.class);
+    protected static final int MESSAGE_COUNT=500;
+    protected static final int PREFETCH_SIZE=50;
+    protected BrokerService broker;
+    protected String bindAddress="tcp://localhost:60706";
+
+    protected abstract Destination getDestination(Session session) throws JMSException;
+
+    protected abstract MessageConsumer getConsumer(Connection connection) throws Exception;
+
+    protected abstract void configureBroker(BrokerService answer) throws Exception;
+
+    public void testSendFirstThenConsume() throws Exception{
+        ConnectionFactory factory=createConnectionFactory();
+        Connection consumerConnection=getConsumerConnection(factory);
+        MessageConsumer consumer=getConsumer(consumerConnection);
+        consumerConnection.close();
+        Connection producerConnection=factory.createConnection();
+        producerConnection.start();
+        Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer=session.createProducer(getDestination(session));
+        List senderList=new ArrayList();
+        for(int i=0;i<MESSAGE_COUNT;i++){
+            Message msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        producerConnection.close();
+        // now consume the messages
+        consumerConnection=getConsumerConnection(factory);
+        // create durable subs
+        consumer=getConsumer(consumerConnection);
+        List consumerList=new ArrayList();
+        for(int i=0;i<MESSAGE_COUNT;i++){
+            Message msg=consumer.receive();
+            consumerList.add(msg);
+        }
+        assertEquals(senderList,consumerList);
+        consumerConnection.close();
+    }
+
+    public void testSendWhilstConsume() throws Exception{
+        ConnectionFactory factory=createConnectionFactory();
+        Connection consumerConnection=getConsumerConnection(factory);
+        // create durable subs
+        MessageConsumer consumer=getConsumer(consumerConnection);
+        consumerConnection.close();
+        Connection producerConnection=factory.createConnection();
+        producerConnection.start();
+        Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer=session.createProducer(getDestination(session));
+        List senderList=new ArrayList();
+        for(int i=0;i<MESSAGE_COUNT/10;i++){
+            TextMessage msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        // now consume the messages
+        consumerConnection=getConsumerConnection(factory);
+        // create durable subs
+        consumer=getConsumer(consumerConnection);
+        final List consumerList=new ArrayList();
+        final CountDownLatch latch=new CountDownLatch(1);
+        consumer.setMessageListener(new MessageListener(){
+
+            public void onMessage(Message msg){
+                try{
+                    // sleep to act as a slow consumer
+                    // which will force a mix of direct and polled dispatching
+                    // using the cursor on the broker
+                    Thread.sleep(50);
+                }catch(Exception e){
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                consumerList.add(msg);
+                if(consumerList.size()==MESSAGE_COUNT){
+                    latch.countDown();
+                }
+            }
+        });
+        for(int i=MESSAGE_COUNT/10;i<MESSAGE_COUNT;i++){
+            TextMessage msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        latch.await(300000,TimeUnit.MILLISECONDS);
+        assertEquals("Still dipatching - count down latch not sprung",latch.getCount(),0);
+        assertEquals("cosumerList - expected: "+MESSAGE_COUNT+" but was: "+consumerList.size(),consumerList.size(),
+                senderList.size());
+        assertEquals(senderList,consumerList);
+        producerConnection.close();
+        consumerConnection.close();
+    }
+
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+        Connection connection=fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+    }
+
+    protected void setUp() throws Exception{
+        if(broker==null){
+            broker=createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        if(broker!=null){
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
+        Properties props=new Properties();
+        props.setProperty("prefetchPolicy.durableTopicPrefetch",""+PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch",""+PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.queuePrefetch",""+PREFETCH_SIZE);
+        cf.setProperties(props);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception{
+        BrokerService answer=new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class KahaQueueStoreTest extends CursorQueueStoreTest{
+    
+    protected static final Log log = LogFactory.getLog(KahaQueueStoreTest.class);
+
+    
+
+    protected void configureBroker(BrokerService answer) throws Exception{
+        KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
+        answer.setPersistenceAdapter(adaptor);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        answer.setDestinationPolicy(pMap);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native