You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 15:05:22 UTC

svn commit: r454368 [3/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/kaha/ main...

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -156,14 +156,34 @@
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
-        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
         // the message table is a synchronizedMap - so just have to synchronize here
+        boolean matchFound = false;
         synchronized(messageTable){
             for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
                 Map.Entry entry=(Entry) iter.next();
-                if(entry.getKey().equals(lastAck)){
-                    return (Message) entry.getValue();
+                if(!matchFound && entry.getKey().equals(id)){
+                    matchFound = true;
+                }else if (matchFound) {
+                    Message msg =  (Message) entry.getValue();
+                    return msg.getMessageId();
+                }
+            }
+        }
+        return null;
+    }
+    
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
+        // the message table is a synchronizedMap - so just have to synchronize here
+        Message last= null;
+        synchronized(messageTable){
+            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+                Map.Entry entry=(Entry) iter.next();
+                
+                if(entry.getKey().equals(id)){
+                    return last != null ? last.getMessageId() : null;
+                }else {
+                    last = (Message)entry.getValue();
                 }
             }
         }
@@ -184,6 +204,9 @@
             }
         }
         return result;
+    }
+    
+    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
     }
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.rapid;
 
 import java.io.IOException;
@@ -23,7 +20,6 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-
 import org.apache.activeio.journal.RecordLocation;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -42,7 +38,6 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,76 +46,73 @@
  * 
  * @version $Revision: 1.13 $
  */
-public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
-    
-    private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
+public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
 
-    private HashMap ackedLastAckLocations = new HashMap();
+    private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class);
+    private HashMap ackedLastAckLocations=new HashMap();
     private final MapContainer subscriberContainer;
     private final MapContainer ackContainer;
     private final Store store;
     private Map subscriberAcks=new ConcurrentHashMap();
 
-    public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
-        super(adapter, destination, messageContainer);
-        this.subscriberContainer = subsContainer;
-        this.ackContainer = ackContainer;
+    public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination,
+            MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{
+        super(adapter,destination,messageContainer);
+        this.subscriberContainer=subsContainer;
+        this.ackContainer=ackContainer;
         this.store=adapter.getStore();
-        
         for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
             Object key=i.next();
             addSubscriberAckContainer(key);
         }
     }
 
-    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
-
+    public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener)
+            throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
         if(list!=null){
             for(Iterator i=list.iterator();i.hasNext();){
                 Object msg=messageContainer.get(i.next());
                 if(msg!=null){
                     if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String) msg);
+                        listener.recoverMessageReference((String)msg);
                     }else{
-                        listener.recoverMessage((Message) msg);
+                        listener.recoverMessage((Message)msg);
                     }
                 }
                 listener.finished();
             }
-        } else {
+        }else{
             listener.finished();
         }
-        
     }
-    
+
     public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
-                    MessageRecoveryListener listener) throws Exception{
+            MessageRecoveryListener listener) throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
         if(list!=null){
             boolean startFound=false;
-            int count = 0;
-            for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+            int count=0;
+            for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){
                 Object msg=messageContainer.get(i.next());
                 if(msg!=null){
                     if(msg.getClass()==String.class){
                         String ref=msg.toString();
-                        if (startFound || lastMessageId == null){
+                        if(startFound||lastMessageId==null){
                             listener.recoverMessageReference(ref);
                             count++;
-                        }
-                        else if(startFound||ref.equals(lastMessageId.toString())){
+                        }else if(!startFound||ref.equals(lastMessageId.toString())){
                             startFound=true;
                         }
                     }else{
-                        Message message=(Message) msg;
-                        if(startFound||message.getMessageId().equals(lastMessageId)){
-                            startFound=true;
-                        }else{
+                        Message message=(Message)msg;
+                        if(startFound||lastMessageId==null){
                             listener.recoverMessage(message);
                             count++;
+                        }else if(!startFound&&message.getMessageId().equals(lastMessageId)){
+                            startFound=true;
                         }
                     }
                 }
@@ -131,11 +123,12 @@
         }
     }
 
-    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
     }
 
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+            throws IOException{
         SubscriptionInfo info=new SubscriptionInfo();
         info.setDestination(destination);
         info.setClientId(clientId);
@@ -163,148 +156,139 @@
             super.addMessage(context,message);
         }
     }
-    
-    
+
     /**
      */
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
-        final boolean debug = log.isDebugEnabled();
-        
-        JournalTopicAck ack = new JournalTopicAck();
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
+            throws IOException{
+        final boolean debug=log.isDebugEnabled();
+        JournalTopicAck ack=new JournalTopicAck();
         ack.setDestination(destination);
         ack.setMessageId(messageId);
         ack.setMessageSequenceId(messageId.getBrokerSequenceId());
         ack.setSubscritionName(subscriptionName);
         ack.setClientId(clientId);
-        ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
-        final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
-        
-        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);        
-        if( !context.isInTransaction() ) {
-            if( debug )
+        ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
+        final RecordLocation location=peristenceAdapter.writeCommand(ack,false);
+        final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+        if(!context.isInTransaction()){
+            if(debug)
                 log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
-            acknowledge(messageId, location, key);
-        } else {
-            if( debug )
+            acknowledge(messageId,location,key);
+        }else{
+            if(debug)
                 log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
-            synchronized (this) {
+            synchronized(this){
                 inFlightTxLocations.add(location);
             }
-            transactionStore.acknowledge(this, ack, location);
+            transactionStore.acknowledge(this,ack,location);
             context.getTransaction().addSynchronization(new Synchronization(){
-                public void afterCommit() throws Exception {                    
-                    if( debug )
+
+                public void afterCommit() throws Exception{
+                    if(debug)
                         log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
-                    synchronized (RapidTopicMessageStore.this) {
+                    synchronized(RapidTopicMessageStore.this){
                         inFlightTxLocations.remove(location);
-                        acknowledge(messageId, location, key);
+                        acknowledge(messageId,location,key);
                     }
                 }
-                public void afterRollback() throws Exception {                    
-                    if( debug )
+
+                public void afterRollback() throws Exception{
+                    if(debug)
                         log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
-                    synchronized (RapidTopicMessageStore.this) {
+                    synchronized(RapidTopicMessageStore.this){
                         inFlightTxLocations.remove(location);
                     }
                 }
             });
         }
-        
     }
-    
-    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
-        try {
-            synchronized(this) {
+
+    public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
+        try{
+            synchronized(this){
                 String subcriberId=getSubscriptionKey(clientId,subscritionName);
                 String id=messageId.toString();
-                ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+                ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
                 if(container!=null){
-                    //container.remove(id);
+                    // container.remove(id);
                     container.removeFirst();
-                    AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+                    AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
                     if(count!=null){
                         if(count.decrementAndGet()>0){
                             ackContainer.put(id,count);
-                        } else {
+                        }else{
                             // no more references to message messageContainer so remove it
                             messageContainer.remove(messageId.toString());
                         }
                     }
                 }
             }
-        }
-        catch (Throwable e) {
-            log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
+        }catch(Throwable e){
+            log.debug("Could not replay acknowledge for message '"+messageId
+                    +"'.  Message may have already been acknowledged. reason: "+e);
         }
     }
-        
 
     /**
      * @param messageId
      * @param location
      * @param key
      */
-    private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
-        synchronized(this) {
-		    lastLocation = location;
-		    ackedLastAckLocations.put(key, messageId);
-            
+    private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){
+        synchronized(this){
+            lastLocation=location;
+            ackedLastAckLocations.put(key,messageId);
             String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
             String id=messageId.toString();
-            ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+            ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
             if(container!=null){
-                //container.remove(id);
+                // container.remove(id);
                 container.removeFirst();
-                AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+                AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
                 if(count!=null){
                     if(count.decrementAndGet()>0){
                         ackContainer.put(id,count);
-                    } else {
+                    }else{
                         // no more references to message messageContainer so remove it
                         messageContainer.remove(messageId.toString());
                     }
                 }
             }
-		}
+        }
     }
-    
+
     protected String getSubscriptionKey(String clientId,String subscriberName){
         String result=clientId+":";
         result+=subscriberName!=null?subscriberName:"NOT_SET";
         return result;
     }
 
-    
-    public RecordLocation checkpoint() throws IOException {
-        
-		ArrayList cpAckedLastAckLocations;
-
+    public RecordLocation checkpoint() throws IOException{
+        ArrayList cpAckedLastAckLocations;
         // swap out the hash maps..
-        synchronized (this) {
-            cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
-            this.ackedLastAckLocations = new HashMap();
+        synchronized(this){
+            cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
+            this.ackedLastAckLocations=new HashMap();
         }
-
-        RecordLocation rc = super.checkpoint();
-        if(!cpAckedLastAckLocations.isEmpty()) {
+        RecordLocation rc=super.checkpoint();
+        if(!cpAckedLastAckLocations.isEmpty()){
             Collections.sort(cpAckedLastAckLocations);
-            RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
-            if( rc == null || t.compareTo(rc)<0 ) {
-                rc = t;
+            RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
+            if(rc==null||t.compareTo(rc)<0){
+                rc=t;
             }
         }
-        
         return rc;
     }
 
-
-    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+    public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriptionName);
         subscriberContainer.remove(key);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
         for(Iterator i=list.iterator();i.hasNext();){
             String id=i.next().toString();
-            AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+            AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
             if(count!=null){
                 if(count.decrementAndGet()>0){
                     ackContainer.put(id,count);
@@ -316,30 +300,63 @@
         }
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return (SubscriptionInfo[]) subscriberContainer.values().toArray(
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+        return (SubscriptionInfo[])subscriberContainer.values().toArray(
                 new SubscriptionInfo[subscriberContainer.size()]);
     }
 
     protected void addSubscriberAckContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs");
+        ListContainer container=store.getListContainer(key,"durable-subs");
         Marshaller marshaller=new StringMarshaller();
         container.setMarshaller(marshaller);
         subscriberAcks.put(key,container);
     }
-    
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
+            throws IOException{
+        MessageId result=null;
+        boolean getNext=false;
         String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        Iterator iter = list.iterator();
-        return (Message) (iter.hasNext() ? iter.next() : null);
-        
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
+        Iterator iter=list.iterator();
+        for(Iterator i=list.iterator();i.hasNext();){
+            String id=i.next().toString();
+            if(id.equals(messageId.toString())){
+                getNext=true;
+            }else if(getNext){
+                result=new MessageId(id);
+                break;
+            }
+        }
+        return result;
     }
-    
+
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
+            throws IOException{
+        MessageId result=null;
+        String previousId=null;
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
+        Iterator iter=list.iterator();
+        for(Iterator i=list.iterator();i.hasNext();){
+            String id=i.next().toString();
+            if(id.equals(messageId.toString())){
+                if(previousId!=null){
+                    result=new MessageId(previousId);
+                }
+                break;
+            }
+            previousId=id;
+        }
+        return result;
+    }
+
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
+        ListContainer list=(ListContainer)subscriberAcks.get(key);
         return list.size();
     }
 
+    public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Mon Oct  9 06:05:20 2006
@@ -507,7 +507,7 @@
         session.commit();
 
         // Only pick up the first message.
-        Message message1 = message1 = consumer.receive(1000);
+        Message message1 = consumer.receive(1000);
         assertNotNull(message1);        
         
         // Don't acknowledge yet.  This should keep our prefetch full.        

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Mon Oct  9 06:05:20 2006
@@ -0,0 +1,215 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class CursorDurableTest extends TestCase{
+    
+    protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
+
+    protected static final int MESSAGE_COUNT=50;
+    protected static final int PREFETCH_SIZE = 5;
+    protected BrokerService broker;
+    protected String bindAddress="tcp://localhost:60706";
+    protected int topicCount=0;
+
+    public void testSendFirstThenConsume() throws Exception{
+        ConnectionFactory factory=createConnectionFactory();
+        Connection consumerConnection= getConsumerConnection(factory);
+        //create durable subs
+        MessageConsumer consumer = getConsumer(consumerConnection);
+        consumerConnection.close();
+        
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getTopic(session));
+        List senderList = new ArrayList();
+        for (int i =0; i < MESSAGE_COUNT; i++) {
+            Message msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        producerConnection.close();
+        
+        //now consume the messages
+        consumerConnection= getConsumerConnection(factory);
+        //create durable subs
+        consumer = getConsumer(consumerConnection);
+        List consumerList = new ArrayList();
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = consumer.receive();
+            consumerList.add(msg);
+        }
+        assertEquals(senderList,consumerList);
+        consumerConnection.close();       
+    }
+    
+    public void testSendWhilstConsume() throws Exception{
+        ConnectionFactory factory=createConnectionFactory();
+        Connection consumerConnection= getConsumerConnection(factory);
+        //create durable subs
+        MessageConsumer consumer = getConsumer(consumerConnection);
+        consumerConnection.close();
+        
+        Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+        Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getTopic(session));
+        List senderList = new ArrayList();
+        for (int i =0; i < MESSAGE_COUNT/10; i++) {
+            TextMessage msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }
+        
+        
+        //now consume the messages
+        consumerConnection= getConsumerConnection(factory);
+        //create durable subs
+        consumer = getConsumer(consumerConnection);
+        final List consumerList = new ArrayList();
+        
+        final CountDownLatch latch = new CountDownLatch(1);
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message msg){
+                try{
+                    //sleep to act as a slow consumer
+                    //which will force a mix of direct and polled dispatching
+                    //using the cursor on the broker
+                    Thread.sleep(50);
+                }catch(Exception e){
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                consumerList.add(msg);
+                if (consumerList.size()==MESSAGE_COUNT) {
+                    latch.countDown();
+                }
+                
+            }
+            
+        });
+        for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
+            TextMessage msg=session.createTextMessage("test"+i);
+            senderList.add(msg);
+            producer.send(msg);
+        }   
+        
+        
+        latch.await(300000,TimeUnit.MILLISECONDS);
+        assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0);
+        assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size());
+        assertEquals(senderList,consumerList);
+        producerConnection.close();
+        consumerConnection.close();       
+    }
+    
+    
+
+    protected Topic getTopic(Session session) throws JMSException{
+        String topicName=getClass().getName();
+        return session.createTopic(topicName);
+    }
+    
+    protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+        Connection connection=fac.createConnection();
+        connection.setClientID("testConsumer");
+        connection.start();
+        return connection;
+        
+    }
+    
+    protected MessageConsumer getConsumer(Connection connection) throws Exception{
+        Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        Topic topic = getTopic(consumerSession);
+        MessageConsumer  consumer = consumerSession.createDurableSubscriber(topic,"testConsumer");
+        return consumer;
+    }
+
+    
+
+    protected void setUp() throws Exception{
+        if(broker==null){
+            broker=createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        
+        if(broker!=null){
+          broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+        ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
+        Properties props = new Properties();
+        props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE);
+        props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE);
+        cf.setProperties(props);
+        return cf;
+    }
+    
+   
+
+    protected BrokerService createBroker() throws Exception{
+        BrokerService answer=new BrokerService();
+        configureBroker(answer);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception{
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java Mon Oct  9 06:05:20 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class KahaCursorDurableTest extends CursorDurableTest{
+    
+    protected static final Log log = LogFactory.getLog(KahaCursorDurableTest.class);
+
+    
+
+    protected void configureBroker(BrokerService answer) throws Exception{
+        KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
+        answer.setPersistenceAdapter(adaptor);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}