You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/29 21:01:32 UTC
svn commit: r560783 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/jmx/ broker/region/ broker/region/cursors/ store/ store/amq/
store/jdbc/ store/jdbc/adapter/ store/kahadaptor/ store/memory/
Author: rajdavies
Date: Sun Jul 29 12:01:29 2007
New Revision: 560783
URL: http://svn.apache.org/viewvc?view=rev&rev=560783
Log:
Fix for http://issues.apache.org/activemq/browse/AMQ-1080
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Sun Jul 29 12:01:29 2007
@@ -421,11 +421,12 @@
ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store=adapter.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){
- public void recoverMessage(Message message) throws Exception{
+ public boolean recoverMessage(Message message) throws Exception{
result.add(message);
+ return true;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception{
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jul 29 12:01:29 2007
@@ -135,25 +135,29 @@
if(messages.isRecoveryRequired()){
store.recover(new MessageRecoveryListener(){
- public void recoverMessage(Message message){
+ public boolean recoverMessage(Message message){
// Message could have expired while it was being loaded..
if(message.isExpired()){
broker.messageExpired(createConnectionContext(),message);
destinationStatistics.getMessages().decrement();
- return;
+ return true;
}
- message.setRegionDestination(Queue.this);
- synchronized(messages){
- try{
- messages.addMessageLast(message);
- }catch(Exception e){
- log.fatal("Failed to add message to cursor",e);
+ if(hasSpace()){
+ message.setRegionDestination(Queue.this);
+ synchronized(messages){
+ try{
+ messages.addMessageLast(message);
+ }catch(Exception e){
+ log.fatal("Failed to add message to cursor",e);
+ }
}
+ destinationStatistics.getMessages().increment();
+ return true;
}
- destinationStatistics.getMessages().increment();
+ return false;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception{
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Sun Jul 29 12:01:29 2007
@@ -190,7 +190,7 @@
msgContext.setDestination(destination);
if(subscription.isRecoveryRequired()){
store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
- public void recoverMessage(Message message) throws Exception{
+ public boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(Topic.this);
try{
msgContext.setMessageReference(message);
@@ -203,9 +203,10 @@
// TODO: Need to handle this better.
e.printStackTrace();
}
+ return true;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception{
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception{
throw new RuntimeException("Should not be called.");
}
@@ -426,11 +427,14 @@
try{
if(store!=null){
store.recover(new MessageRecoveryListener(){
- public void recoverMessage(Message message) throws Exception{
+ public boolean recoverMessage(Message message) throws Exception{
result.add(message);
+ return true;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception{}
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception{
+ return true;
+ }
public void finished(){}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Sun Jul 29 12:01:29 2007
@@ -130,16 +130,17 @@
public void finished(){
}
- public void recoverMessage(Message message) throws Exception{
+ public boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
batchList.addLast(message);
+ return true;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception {
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception {
Message msg=store.getMessage(messageReference);
if(msg!=null){
- recoverMessage(msg);
+ return recoverMessage(msg);
}else{
String err = "Failed to retrieve message for id: "+messageReference;
log.error(err);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Sun Jul 29 12:01:29 2007
@@ -159,16 +159,17 @@
public void finished(){
}
- public synchronized void recoverMessage(Message message) throws Exception{
+ public synchronized boolean recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
// only increment if count is zero (could have been cached)
if(message.getReferenceCount()==0){
message.incrementReferenceCount();
}
batchList.addLast(message);
+ return true;
}
- public void recoverMessageReference(MessageId messageReference) throws Exception{
+ public boolean recoverMessageReference(MessageId messageReference) throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Sun Jul 29 12:01:29 2007
@@ -24,8 +24,7 @@
* @version $Revision: 1.4 $
*/
public interface MessageRecoveryListener {
- void recoverMessage(Message message) throws Exception;
- void recoverMessageReference(MessageId ref) throws Exception;
- void finished();
+ boolean recoverMessage(Message message) throws Exception;
+ boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Sun Jul 29 12:01:29 2007
@@ -34,27 +34,29 @@
this.listener=listener;
}
- public void finished(){
- listener.finished();
- }
-
+
public boolean hasSpace(){
return listener.hasSpace();
}
- public void recoverMessage(Message message) throws Exception{
- listener.recoverMessage(message);
- lastRecovered=message.getMessageId();
- count++;
+ public boolean recoverMessage(Message message) throws Exception{
+ if(listener.hasSpace()){
+ listener.recoverMessage(message);
+ lastRecovered=message.getMessageId();
+ count++;
+ return true;
+ }
+ return false;
}
- public void recoverMessageReference(MessageId ref) throws Exception{
+ public boolean recoverMessageReference(MessageId ref) throws Exception{
Message message=this.store.getMessage(ref);
if(message!=null){
- recoverMessage(message);
+ return recoverMessage(message);
}else{
log.error("Message id "+ref+" could not be recovered from the data store!");
}
+ return false;
}
MessageId getLastRecoveredMessageId() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java Sun Jul 29 12:01:29 2007
@@ -23,7 +23,6 @@
* @version $Revision: 1.3 $
*/
public interface JDBCMessageRecoveryListener {
- void recoverMessage(long sequenceId, byte[] message) throws Exception;
- void recoverMessageReference(String reference) throws Exception;
- void finished();
+ boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
+ boolean recoverMessageReference(String reference) throws Exception;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Sun Jul 29 12:01:29 2007
@@ -154,16 +154,13 @@
try {
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
- public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
+ return listener.recoverMessage(msg);
}
- public void recoverMessageReference(String reference) throws Exception {
- listener.recoverMessageReference(new MessageId(reference));
- }
- public void finished(){
- listener.finished();
+ public boolean recoverMessageReference(String reference) throws Exception {
+ return listener.recoverMessageReference(new MessageId(reference));
}
});
} catch (SQLException e) {
@@ -234,24 +231,25 @@
adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
new JDBCMessageRecoveryListener(){
- public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+ public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
if(listener.hasSpace()){
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
lastMessageId.set(sequenceId);
+ return true;
}
+ return false;
}
- public void recoverMessageReference(String reference) throws Exception{
+ public boolean recoverMessageReference(String reference) throws Exception{
if(listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(reference));
+ return true;
}
+ return false;
}
- public void finished(){
- listener.finished();
- }
});
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Sun Jul 29 12:01:29 2007
@@ -72,18 +72,15 @@
try {
adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
new JDBCMessageRecoveryListener() {
- public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
+ return listener.recoverMessage(msg);
}
- public void recoverMessageReference(String reference) throws Exception {
- listener.recoverMessageReference(new MessageId(reference));
+ public boolean recoverMessageReference(String reference) throws Exception {
+ return listener.recoverMessageReference(new MessageId(reference));
}
- public void finished(){
- listener.finished();
- }
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
@@ -108,22 +105,21 @@
adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
new JDBCMessageRecoveryListener(){
- public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+ public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
if(listener.hasSpace()){
Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
finalLast.set(sequenceId);
+ return true;
}
+ return false;
}
- public void recoverMessageReference(String reference) throws Exception{
- listener.recoverMessageReference(new MessageId(reference));
+ public boolean recoverMessageReference(String reference) throws Exception{
+ return listener.recoverMessageReference(new MessageId(reference));
}
- public void finished(){
- listener.finished();
- }
});
}catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Sun Jul 29 12:01:29 2007
@@ -297,17 +297,20 @@
rs=s.executeQuery();
if(statements.isUseExternalMessageReferences()){
while(rs.next()){
- listener.recoverMessageReference(rs.getString(2));
+ if (!listener.recoverMessageReference(rs.getString(2))) {
+ break;
+ }
}
}else{
while(rs.next()){
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+ if(!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+ break;
+ }
}
}
}finally{
close(rs);
close(s);
- listener.finished();
}
}
@@ -350,17 +353,20 @@
rs=s.executeQuery();
if(statements.isUseExternalMessageReferences()){
while(rs.next()){
- listener.recoverMessageReference(rs.getString(2));
+ if (!listener.recoverMessageReference(rs.getString(2))){
+ break;
+ }
}
}else{
while(rs.next()){
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+ if (!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+ break;
+ }
}
}
}finally{
close(rs);
close(s);
- listener.finished();
}
}
@@ -379,19 +385,24 @@
int count=0;
if(statements.isUseExternalMessageReferences()){
while(rs.next()&&count<maxReturned){
- listener.recoverMessageReference(rs.getString(1));
- count++;
+ if(listener.recoverMessageReference(rs.getString(1))){
+ count++;
+ }else{
+ break;
+ }
}
}else{
while(rs.next()&&count<maxReturned){
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
- count++;
+ if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+ count++;
+ }else{
+ break;
+ }
}
}
}finally{
close(rs);
close(s);
- listener.finished();
}
}
@@ -657,7 +668,8 @@
}
- public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
+ public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,
+ int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
PreparedStatement s=null;
ResultSet rs=null;
try{
@@ -669,23 +681,27 @@
int count=0;
if(statements.isUseExternalMessageReferences()){
while(rs.next()&&count<maxReturned){
- listener.recoverMessageReference(rs.getString(1));
- count++;
+ if(listener.recoverMessageReference(rs.getString(1))){
+ count++;
+ }else{
+ log.debug("Stopped recover next messages");
+ }
}
}else{
while(rs.next()&&count<maxReturned){
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
- count++;
+ if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+ count++;
+ }else{
+ log.debug("Stopped recover next messages");
+ }
}
}
- }catch(Exception e) {
+ }catch(Exception e){
e.printStackTrace();
- }finally {
+ }finally{
close(rs);
close(s);
- listener.finished();
}
-
}
/*
* Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Sun Jul 29 12:01:29 2007
@@ -66,8 +66,12 @@
return result;
}
- protected void recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
- listener.recoverMessage(msg);
+ protected boolean recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
+ if(listener.hasSpace()){
+ listener.recoverMessage(msg);
+ return true;
+ }
+ return false;
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
@@ -89,9 +93,10 @@
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
Message msg=(Message)messageContainer.getValue(entry);
- recoverMessage(listener,msg);
+ if(!recoverMessage(listener,msg)) {
+ break;
+ }
}
- listener.finished();
}
public void start(){
@@ -167,7 +172,6 @@
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
- listener.finished();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Sun Jul 29 12:01:29 2007
@@ -58,16 +58,21 @@
throw new RuntimeException("Use addMessageReference instead");
}
- protected final void recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
- listener.recoverMessageReference(new MessageId(record.getMessageId()));
+ protected final boolean recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
+ if (listener.hasSpace()) {
+ listener.recoverMessageReference(new MessageId(record.getMessageId()));
+ return true;
+ }
+ return false;
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry);
- recoverReference(listener,record);
+ if (!recoverReference(listener,record)) {
+ break;
+ }
}
- listener.finished();
}
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@@ -95,7 +100,6 @@
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
- listener.finished();
}
public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Sun Jul 29 12:01:29 2007
@@ -148,11 +148,12 @@
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Message msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
- recoverMessage(listener, msg);
+ if(!recoverMessage(listener,msg)){
+ break;
+ }
}
}
}
- listener.finished();
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
@@ -186,7 +187,6 @@
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
- listener.finished();
}
public void delete(){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Sun Jul 29 12:01:29 2007
@@ -226,12 +226,10 @@
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
- listener.finished();
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
-
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
@@ -239,11 +237,12 @@
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
ReferenceRecord msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
- recoverReference(listener,msg);
+ if(!recoverReference(listener,msg)){
+ break;
+ }
}
}
}
- listener.finished();
}
public synchronized void resetBatching(String clientId,String subscriptionName){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Sun Jul 29 12:01:29 2007
@@ -95,7 +95,6 @@
listener.recoverMessage((Message)msg);
}
}
- listener.finished();
}
}
@@ -150,7 +149,6 @@
pastLackBatch=entry.getKey().equals(lastBatchId);
}
}
- listener.finished();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=560783&r1=560782&r2=560783
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Sun Jul 29 12:01:29 2007
@@ -57,7 +57,6 @@
listener.recoverMessage((Message)msg);
}
}
- listener.finished();
}
void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
@@ -83,7 +82,7 @@
if(lastId!=null){
lastBatch=lastId;
}
- listener.finished();
+
}
void resetBatching(){