You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/25 17:25:32 UTC

svn commit: r1613449 - /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java

Author: rgodfrey
Date: Fri Jul 25 15:25:32 2014
New Revision: 1613449

URL: http://svn.apache.org/r1613449
Log:
QPID-5907 : [Java Broker] Remove unreferenced messages from the store in asynchronous store recoverer process

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1613449&r1=1613448&r2=1613449&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Fri Jul 25 15:25:32 2014
@@ -43,6 +43,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.txn.DtxBranch;
 import org.apache.qpid.server.txn.DtxRegistry;
@@ -141,10 +142,26 @@ public class AsynchronousMessageStoreRec
         private synchronized void completeRecovery()
         {
             // at this point nothing should be writing to the map of recovered messages
-            for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values())
+            for (Map.Entry<Long,MessageReference<? extends ServerMessage<?>>> entry : _recoveredMessages.entrySet())
             {
-                entry.release();
+                entry.getValue().release();
+                entry.setValue(null); // free up any memory associated with the reference object
             }
+            getStore().visitMessages(new MessageHandler()
+            {
+                @Override
+                public boolean handle(final StoredMessage<?> storedMessage)
+                {
+
+                    long messageNumber = storedMessage.getMessageNumber();
+                    if(!_recoveredMessages.containsKey(messageNumber))
+                    {
+                        _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing....");
+                        storedMessage.remove();
+                    }
+                    return messageNumber <_maxMessageId-1;
+                }
+            });
             _recoveredMessages.clear();
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org