You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/22 22:53:28 UTC

svn commit: r1505805 - /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Author: tabish
Date: Mon Jul 22 20:53:27 2013
New Revision: 1505805

URL: http://svn.apache.org/r1505805
Log:
[LevelDB]

https://issues.apache.org/jira/browse/AMQ-4296

Fixes remainder of failing unit tests.  The LevelDB wasn't incrementing or decrementing reference counts on messages added to the store which causes the expectations of certain memory limit based tests to fail as the memory usage was being updates after the store add instead of during so a message could get placed into the batch list of a cursor when we did not expect that it would.  This could also cause a browse to return fewer message than we want as the in memory messages would top out the usage limit so we'd never page in one batch of messages. 

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1505805&r1=1505804&r2=1505805&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Mon Jul 22 20:53:27 2013
@@ -624,10 +624,13 @@ class LevelDBStore extends LockableServi
 
     def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
       val seq = lastSeq.incrementAndGet()
+      message.incrementReferenceCount()
+      uow.addCompleteListener({
+        message.decrementReferenceCount()
+      })
       uow.enqueue(key, seq, message, delay)
     }
 
-
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
       message.getMessageId.setEntryLocator(null)
@@ -718,7 +721,6 @@ class LevelDBStore extends LockableServi
     }
   }
 
-
   def getTopicGCPositions = {
     import collection.JavaConversions._
     val topics = this.synchronized {