You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/25 17:25:32 UTC
svn commit: r1613449 -
/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Author: rgodfrey
Date: Fri Jul 25 15:25:32 2014
New Revision: 1613449
URL: http://svn.apache.org/r1613449
Log:
QPID-5907 : [Java Broker] Remove unreferenced messages from the store in asynchronous store recoverer process
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1613449&r1=1613448&r2=1613449&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Fri Jul 25 15:25:32 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -141,10 +142,26 @@ public class AsynchronousMessageStoreRec
private synchronized void completeRecovery()
{
// at this point nothing should be writing to the map of recovered messages
- for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values())
+ for (Map.Entry<Long,MessageReference<? extends ServerMessage<?>>> entry : _recoveredMessages.entrySet())
{
- entry.release();
+ entry.getValue().release();
+ entry.setValue(null); // free up any memory associated with the reference object
}
+ getStore().visitMessages(new MessageHandler()
+ {
+ @Override
+ public boolean handle(final StoredMessage<?> storedMessage)
+ {
+
+ long messageNumber = storedMessage.getMessageNumber();
+ if(!_recoveredMessages.containsKey(messageNumber))
+ {
+ _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
+ return messageNumber <_maxMessageId-1;
+ }
+ });
_recoveredMessages.clear();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org