You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/29 21:01:32 UTC

svn commit: r560783 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/cursors/ store/ store/amq/ store/jdbc/ store/jdbc/adapter/ store/kahadaptor/ store/memory/

Author: rajdavies
Date: Sun Jul 29 12:01:29 2007
New Revision: 560783

URL: http://svn.apache.org/viewvc?view=rev&rev=560783
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-1080

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Sun Jul 29 12:01:29 2007
@@ -421,11 +421,12 @@
             ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
             TopicMessageStore store=adapter.createTopicMessageStore(topic);
             store.recover(new MessageRecoveryListener(){
-                public void recoverMessage(Message message) throws Exception{
+                public boolean recoverMessage(Message message) throws Exception{
                     result.add(message);
+                    return true;
                 }
 
-                public void recoverMessageReference(MessageId messageReference) throws Exception{
+                public boolean recoverMessageReference(MessageId messageReference) throws Exception{
                 	throw new RuntimeException("Should not be called.");
                 }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jul 29 12:01:29 2007
@@ -135,25 +135,29 @@
             if(messages.isRecoveryRequired()){
                 store.recover(new MessageRecoveryListener(){
 
-                    public void recoverMessage(Message message){
+                    public boolean recoverMessage(Message message){
                         // Message could have expired while it was being loaded..
                         if(message.isExpired()){
                             broker.messageExpired(createConnectionContext(),message);
                             destinationStatistics.getMessages().decrement();
-                            return;
+                            return true;
                         }
-                        message.setRegionDestination(Queue.this);
-                        synchronized(messages){
-                            try{
-                                messages.addMessageLast(message);
-                            }catch(Exception e){
-                                log.fatal("Failed to add message to cursor",e);
+                        if(hasSpace()){
+                            message.setRegionDestination(Queue.this);
+                            synchronized(messages){
+                                try{
+                                    messages.addMessageLast(message);
+                                }catch(Exception e){
+                                    log.fatal("Failed to add message to cursor",e);
+                                }
                             }
+                            destinationStatistics.getMessages().increment();
+                            return true;
                         }
-                        destinationStatistics.getMessages().increment();
+                        return false;
                     }
 
-                    public void recoverMessageReference(MessageId messageReference) throws Exception{
+                    public boolean recoverMessageReference(MessageId messageReference) throws Exception{
                         throw new RuntimeException("Should not be called.");
                     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sun Jul 29 12:01:29 2007
@@ -190,7 +190,7 @@
             msgContext.setDestination(destination);
             if(subscription.isRecoveryRequired()){
                 store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
-                    public void recoverMessage(Message message) throws Exception{
+                    public boolean recoverMessage(Message message) throws Exception{
                         message.setRegionDestination(Topic.this);
                         try{
                             msgContext.setMessageReference(message);
@@ -203,9 +203,10 @@
                             // TODO: Need to handle this better.
                             e.printStackTrace();
                         }
+                        return true;
                     }
 
-                    public void recoverMessageReference(MessageId messageReference) throws Exception{
+                    public boolean recoverMessageReference(MessageId messageReference) throws Exception{
                         throw new RuntimeException("Should not be called.");
                     }
 
@@ -426,11 +427,14 @@
         try{
             if(store!=null){
                 store.recover(new MessageRecoveryListener(){
-                    public void recoverMessage(Message message) throws Exception{
+                    public boolean recoverMessage(Message message) throws Exception{
                         result.add(message);
+                        return true;
                     }
 
-                    public void recoverMessageReference(MessageId messageReference) throws Exception{}
+                    public boolean  recoverMessageReference(MessageId messageReference) throws Exception{
+                        return true;
+                    }
 
                     public void finished(){}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Sun Jul 29 12:01:29 2007
@@ -130,16 +130,17 @@
     public void finished(){
     }
 
-    public void recoverMessage(Message message) throws Exception{
+    public boolean recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
         message.incrementReferenceCount();
         batchList.addLast(message);
+        return true;
     }
 
-    public void recoverMessageReference(MessageId messageReference) throws Exception {
+    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
         Message msg=store.getMessage(messageReference);
         if(msg!=null){
-            recoverMessage(msg);
+            return recoverMessage(msg);
         }else{
             String err = "Failed to retrieve message for id: "+messageReference;
             log.error(err);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Sun Jul 29 12:01:29 2007
@@ -159,16 +159,17 @@
     public void finished(){
     }
 
-    public synchronized void recoverMessage(Message message) throws Exception{
+    public synchronized boolean recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
         // only increment if count is zero (could have been cached)
         if(message.getReferenceCount()==0){
             message.incrementReferenceCount();
         }
         batchList.addLast(message);
+        return true;
     }
 
-    public void recoverMessageReference(MessageId messageReference) throws Exception{
+    public boolean recoverMessageReference(MessageId messageReference) throws Exception{
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Sun Jul 29 12:01:29 2007
@@ -24,8 +24,7 @@
  * @version $Revision: 1.4 $
  */
 public interface MessageRecoveryListener {
-    void recoverMessage(Message message) throws Exception;
-    void recoverMessageReference(MessageId ref) throws Exception;
-    void finished();
+    boolean recoverMessage(Message message) throws Exception;
+    boolean recoverMessageReference(MessageId ref) throws Exception;
     boolean hasSpace();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Sun Jul 29 12:01:29 2007
@@ -34,27 +34,29 @@
         this.listener=listener;
     }
 
-    public void finished(){
-        listener.finished();
-    }
-
+    
     public boolean hasSpace(){
         return listener.hasSpace();
     }
 
-    public void recoverMessage(Message message) throws Exception{
-        listener.recoverMessage(message);
-        lastRecovered=message.getMessageId();
-        count++;
+    public boolean recoverMessage(Message message) throws Exception{
+        if(listener.hasSpace()){
+            listener.recoverMessage(message);
+            lastRecovered=message.getMessageId();
+            count++;
+            return true;
+        }
+        return false;
     }
 
-    public void recoverMessageReference(MessageId ref) throws Exception{
+    public boolean recoverMessageReference(MessageId ref) throws Exception{
         Message message=this.store.getMessage(ref);
         if(message!=null){
-            recoverMessage(message);
+           return  recoverMessage(message);
         }else{
             log.error("Message id "+ref+" could not be recovered from the data store!");
         }
+        return false;
     }
     
     MessageId getLastRecoveredMessageId() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java Sun Jul 29 12:01:29 2007
@@ -23,7 +23,6 @@
  * @version $Revision: 1.3 $
  */
 public interface JDBCMessageRecoveryListener {
-    void recoverMessage(long sequenceId, byte[] message) throws Exception;
-    void recoverMessageReference(String reference) throws Exception;
-    void finished();
+    boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
+    boolean recoverMessageReference(String reference) throws Exception;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Sun Jul 29 12:01:29 2007
@@ -154,16 +154,13 @@
         try {
             c = persistenceAdapter.getTransactionContext();
             adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
-                public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
                     msg.getMessageId().setBrokerSequenceId(sequenceId);
-                    listener.recoverMessage(msg);
+                    return listener.recoverMessage(msg);
                 }
-                public void recoverMessageReference(String reference) throws Exception {
-                    listener.recoverMessageReference(new MessageId(reference));
-                }
-                public void finished(){
-                    listener.finished();
+                public boolean recoverMessageReference(String reference) throws Exception {
+                    return listener.recoverMessageReference(new MessageId(reference));
                 }
             });
         } catch (SQLException e) {
@@ -234,24 +231,25 @@
             adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
                     new JDBCMessageRecoveryListener(){
 
-                        public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+                        public  boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
                             if(listener.hasSpace()){
                                 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
                                 msg.getMessageId().setBrokerSequenceId(sequenceId);
                                 listener.recoverMessage(msg);
                                 lastMessageId.set(sequenceId);
+                                return true;
                             }
+                            return false;
                         }
 
-                        public void recoverMessageReference(String reference) throws Exception{
+                        public boolean recoverMessageReference(String reference) throws Exception{
                             if(listener.hasSpace()) {
                                 listener.recoverMessageReference(new MessageId(reference));
+                                return true;
                             }
+                            return false;
                         }
 
-                        public void finished(){
-                            listener.finished();
-                        }
                     });
         }catch(SQLException e){
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Sun Jul 29 12:01:29 2007
@@ -72,18 +72,15 @@
         try {
             adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
                     new JDBCMessageRecoveryListener() {
-                        public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+                        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                             Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
                             msg.getMessageId().setBrokerSequenceId(sequenceId);
-                            listener.recoverMessage(msg);
+                            return listener.recoverMessage(msg);
                         }
-                        public void recoverMessageReference(String reference) throws Exception {
-                            listener.recoverMessageReference(new MessageId(reference));
+                        public boolean  recoverMessageReference(String reference) throws Exception {
+                            return listener.recoverMessageReference(new MessageId(reference));
                         }
                         
-                        public void finished(){
-                            listener.finished();
-                        }
                     });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);
@@ -108,22 +105,21 @@
             adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
                     new JDBCMessageRecoveryListener(){
 
-                        public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+                        public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
                             if(listener.hasSpace()){
                                 Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
                                 msg.getMessageId().setBrokerSequenceId(sequenceId);
                                 listener.recoverMessage(msg);
                                 finalLast.set(sequenceId);
+                                return true;
                             }
+                            return false;
                         }
 
-                        public void recoverMessageReference(String reference) throws Exception{
-                            listener.recoverMessageReference(new MessageId(reference));
+                        public boolean recoverMessageReference(String reference) throws Exception{
+                            return listener.recoverMessageReference(new MessageId(reference));
                         }
 
-                        public void finished(){
-                            listener.finished();
-                        }
                     });
         }catch(SQLException e){
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Sun Jul 29 12:01:29 2007
@@ -297,17 +297,20 @@
             rs=s.executeQuery();
             if(statements.isUseExternalMessageReferences()){
                 while(rs.next()){
-                    listener.recoverMessageReference(rs.getString(2));
+                    if (!listener.recoverMessageReference(rs.getString(2))) {
+                        break;
+                    }
                 }
             }else{
                 while(rs.next()){
-                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+                    if(!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+                        break;
+                    }
                 }
             }
         }finally{
             close(rs);
             close(s);
-            listener.finished();
         }
     }
 
@@ -350,17 +353,20 @@
             rs=s.executeQuery();
             if(statements.isUseExternalMessageReferences()){
                 while(rs.next()){
-                    listener.recoverMessageReference(rs.getString(2));
+                    if (!listener.recoverMessageReference(rs.getString(2))){
+                        break;
+                    }
                 }
             }else{
                 while(rs.next()){
-                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+                    if (!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+                        break;
+                    }
                 }
             }
         }finally{
             close(rs);
             close(s);
-            listener.finished();
         }
     }
 
@@ -379,19 +385,24 @@
             int count=0;
             if(statements.isUseExternalMessageReferences()){
                 while(rs.next()&&count<maxReturned){
-                    listener.recoverMessageReference(rs.getString(1));
-                    count++;
+                    if(listener.recoverMessageReference(rs.getString(1))){
+                        count++;
+                    }else{
+                        break;
+                    }
                 }
             }else{
                 while(rs.next()&&count<maxReturned){
-                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
-                    count++;
+                    if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+                        count++;
+                    }else{
+                        break;
+                    }
                 }
             }
         }finally{
             close(rs);
             close(s);
-            listener.finished();
         }
     }
 
@@ -657,7 +668,8 @@
     }
 
     
-    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
+    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,
+            int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
         PreparedStatement s=null;
         ResultSet rs=null;
         try{
@@ -669,23 +681,27 @@
             int count=0;
             if(statements.isUseExternalMessageReferences()){
                 while(rs.next()&&count<maxReturned){
-                    listener.recoverMessageReference(rs.getString(1));
-                    count++;
+                    if(listener.recoverMessageReference(rs.getString(1))){
+                        count++;
+                    }else{
+                        log.debug("Stopped recover next messages");
+                    }
                 }
             }else{
                 while(rs.next()&&count<maxReturned){
-                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
-                    count++;
+                    if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+                        count++;
+                    }else{
+                        log.debug("Stopped recover next messages");
+                    }
                 }
             }
-        }catch(Exception e) {
+        }catch(Exception e){
             e.printStackTrace();
-        }finally {
+        }finally{
             close(rs);
             close(s);
-            listener.finished();
         }
-        
     }
     /*
      * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Sun Jul 29 12:01:29 2007
@@ -66,8 +66,12 @@
         return result;
     }
 
-    protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
-        listener.recoverMessage(msg);
+    protected boolean recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
+        if(listener.hasSpace()){
+            listener.recoverMessage(msg);
+            return true;
+        }
+        return false;
     }
 
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
@@ -89,9 +93,10 @@
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             Message msg=(Message)messageContainer.getValue(entry);
-            recoverMessage(listener,msg);
+            if(!recoverMessage(listener,msg)) {
+                break;
+            }
         }
-        listener.finished();
     }
 
     public void start(){
@@ -167,7 +172,6 @@
                 entry=messageContainer.getNext(entry);
             }while(entry!=null&&count<maxReturned&&listener.hasSpace());
         }
-        listener.finished();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Sun Jul 29 12:01:29 2007
@@ -58,16 +58,21 @@
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
-        listener.recoverMessageReference(new MessageId(record.getMessageId()));
+    protected final boolean recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
+        if (listener.hasSpace()) {
+            listener.recoverMessageReference(new MessageId(record.getMessageId()));
+            return true;
+        }
+        return false;
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws Exception{
         for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
             ReferenceRecord record=messageContainer.getValue(entry);
-            recoverReference(listener,record);
+            if (!recoverReference(listener,record)) {
+                break;
+            }
         }
-        listener.finished();
     }
 
     public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@@ -95,7 +100,6 @@
                 entry=messageContainer.getNext(entry);
             }while(entry!=null&&count<maxReturned&&listener.hasSpace());
         }
-        listener.finished();
     }
 
     public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Sun Jul 29 12:01:29 2007
@@ -148,11 +148,12 @@
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
                 Message msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
-                	recoverMessage(listener, msg);
+                    if(!recoverMessage(listener,msg)){
+                        break;
+                    }
                 }
             }
         }
-        listener.finished();
     }
 
 	public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
@@ -186,7 +187,6 @@
                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }
         }
-        listener.finished();
     }
 
     public void delete(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Sun Jul 29 12:01:29 2007
@@ -226,12 +226,10 @@
                 }while(entry!=null&&count<maxReturned&&listener.hasSpace());
             }
         }
-        listener.finished();
     }
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
             throws Exception{
-        
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){
@@ -239,11 +237,12 @@
                 ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
                 ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
-                    recoverReference(listener,msg);
+                    if(!recoverReference(listener,msg)){
+                        break;
+                    }
                 }
             }
         }
-        listener.finished();
     }
 
     public synchronized void resetBatching(String clientId,String subscriptionName){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Sun Jul 29 12:01:29 2007
@@ -95,7 +95,6 @@
                     listener.recoverMessage((Message)msg);
                 }
             }
-            listener.finished();
         }
     }
 
@@ -150,7 +149,6 @@
                     pastLackBatch=entry.getKey().equals(lastBatchId);
                 }
             }
-            listener.finished();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Sun Jul 29 12:01:29 2007
@@ -57,7 +57,6 @@
                 listener.recoverMessage((Message)msg);
             }
         }
-        listener.finished();
     }
 
     void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@@ -83,7 +82,7 @@
         if(lastId!=null){
             lastBatch=lastId;
         }
-        listener.finished();
+
     }
 
     void resetBatching(){