You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/06 01:01:14 UTC

svn commit: r1490065 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: DBManager.scala LevelDBStore.scala

Author: chirino
Date: Wed Jun  5 23:01:13 2013
New Revision: 1490065

URL: http://svn.apache.org/r1490065
Log:
Fixes a ghost messages issue where the queue cursor goes out of sync /w the leveldb store when transactions are being used.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1490065&r1=1490064&r2=1490065&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed Jun  5 23:01:13 2013
@@ -331,9 +331,7 @@ class DelayableUOW(val manager:DBManager
       val s = size
       if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
         asyncCapacityUsed = s
-        manager.parent.blocking_executor.execute(^{
-          complete_listeners.foreach(_())
-        })
+        complete_listeners.foreach(_())
       } else {
         manager.asyncCapacityRemaining.addAndGet(s)
       }
@@ -352,9 +350,7 @@ class DelayableUOW(val manager:DBManager
         asyncCapacityUsed = 0
       } else {
         manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
-        manager.parent.blocking_executor.execute(^{
-          complete_listeners.foreach(_())
-        })
+        complete_listeners.foreach(_())
       }
       countDownFuture.set(null)
 
@@ -665,19 +661,15 @@ class DBManager(val parent:LevelDBStore)
     client.collectionIsEmpty(key)
   }
   
-  def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
+  def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
     var lastmsgid:MessageId = null
+    var count = 0L
     client.queueCursor(key, startPos) { msg =>
-      if( listener.hasSpace ) {
-        if( listener.recoverMessage(msg) ) {
-          lastmsgid = msg.getMessageId
-          true
-        } else {
-          false
-        }
-      } else {
-        false
+      if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
+        lastmsgid = msg.getMessageId
+        count += 1
       }
+      count < max
     }
     if( lastmsgid==null ) {
       startPos

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1490065&r1=1490064&r2=1490065&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed Jun  5 23:01:13 2013
@@ -421,18 +421,23 @@ class LevelDBStore extends LockableServi
   }
   
   def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
-    preCommit.run()
     transactions.remove(txid) match {
       case null =>
         postCommit.run()
       case tx =>
         val done = new CountDownLatch(1)
-        withUow { uow =>
-          for( action <- tx.commitActions ) {
-            action.commit(uow)
+        // Ugly synchronization hack to make sure messages are ordered the way the cursor expects them.
+        transactions.synchronized {
+          withUow { uow =>
+            for( action <- tx.commitActions ) {
+              action.commit(uow)
+            }
+            uow.syncFlag = true
+            uow.addCompleteListener {
+              preCommit.run()
+              done.countDown()
+            }
           }
-          uow.syncFlag = true
-          uow.addCompleteListener { done.countDown() }
         }
         done.await()
         if( tx.prepared ) {
@@ -611,14 +616,15 @@ class LevelDBStore extends LockableServi
 
   case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) {
 
-    protected val lastSeq: AtomicLong = new AtomicLong(0)
+    val lastSeq: AtomicLong = new AtomicLong(0)
     protected var cursorPosition: Long = 0
     val preparedAcks = new HashSet[MessageId]()
 
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
     def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
-      uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
+      val seq = lastSeq.incrementAndGet()
+      uow.enqueue(key, seq, message, delay)
     }
 
 
@@ -680,27 +686,7 @@ class LevelDBStore extends LockableServi
     }
 
     def recover(listener: MessageRecoveryListener): Unit = {
-      cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
-    }
-
-    case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener {
-      def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
-      def hasSpace = listener.hasSpace
-      def recoverMessageReference(ref: MessageId) = {
-        if (!preparedAcks.contains(ref)) {
-          listener.recoverMessageReference(ref)
-        } else {
-          true
-        }
-      }
-
-      def recoverMessage(message: Message) = {
-        if (!preparedAcks.contains(message.getMessageId)) {
-          listener.recoverMessage(message)
-        } else {
-          true
-        }
-      }
+      cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
     }
 
     def resetBatching: Unit = {
@@ -708,32 +694,15 @@ class LevelDBStore extends LockableServi
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
-        val limiting = LimitingRecoveryListener(maxReturned, listener)
-        val excluding = PreparedExcluding(limiting)
-        cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+      cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
     }
 
     override def setBatch(id: MessageId): Unit = {
-      cursorPosition = db.queuePosition(id)+1
+      cursorPosition = db.queuePosition(id)
     }
 
   }
 
-  case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
-    var recovered: Int = 0
-    def hasSpace = recovered < max
-    def recoverMessage(message: Message) = {
-      recovered += 1;
-      listener.recoverMessage(message)
-    }
-    def recoverMessageReference(ref: MessageId) = {
-      recovered += 1;
-      listener.recoverMessageReference(ref)
-    }
-    def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
-  }
-  
-
   //
   // This gts called when the store is first loading up, it restores
   // the existing durable subs..
@@ -849,13 +818,13 @@ class LevelDBStore extends LockableServi
     }
     def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
-        sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
+        sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
       }
     }
     
     def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
-        sub.cursorPosition = db.cursorMessages(key,  PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
+        sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
       }
     }