You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/09 20:15:26 UTC

svn commit: r1501420 - /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Author: tabish
Date: Tue Jul  9 18:15:26 2013
New Revision: 1501420

URL: http://svn.apache.org/r1501420
Log:
Fix for failing LveelDB unit tests where only non-persistent messages are sent in a TX.  The preCommit wasn't being run so the Queue's orderIndexUpdates structure wasn't getting updated with the TX to process in the postCommit phase.  

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1501420&r1=1501419&r2=1501420&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Tue Jul  9 18:15:26 2013
@@ -49,7 +49,7 @@ object LevelDBStore extends Log {
 
   val DONE = new CountDownFuture[AnyRef]();
   DONE.set(null)
-  
+
   def toIOException(e: Throwable): IOException = {
     if (e.isInstanceOf[ExecutionException]) {
       var cause: Throwable = (e.asInstanceOf[ExecutionException]).getCause
@@ -143,7 +143,7 @@ class LevelDBStore extends LockableServi
   var directory = DEFAULT_DIRECTORY
   @BeanProperty
   var logDirectory: File = null
-  
+
   @BeanProperty
   var logSize: Long = 1024 * 1024 * 100
   @BeanProperty
@@ -301,13 +301,13 @@ class LevelDBStore extends LockableServi
   def createTransactionStore = this
 
   val transactions = new ConcurrentHashMap[TransactionId, Transaction]()
-  
+
   trait TransactionAction {
     def commit(uow:DelayableUOW):Unit
     def prepare(uow:DelayableUOW):Unit
     def rollback(uow:DelayableUOW):Unit
   }
-  
+
   case class Transaction(id:TransactionId) {
     val commitActions = ListBuffer[TransactionAction]()
 
@@ -405,7 +405,7 @@ class LevelDBStore extends LockableServi
       }
     }
   }
-  
+
   def transaction(txid: TransactionId) = {
     var rc = transactions.get(txid)
     if( rc == null ) {
@@ -417,10 +417,12 @@ class LevelDBStore extends LockableServi
     }
     rc
   }
-  
+
   def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
     transactions.remove(txid) match {
       case null =>
+        // Only in-flight non-persistent messages in this TX.
+        preCommit.run()
         postCommit.run()
       case tx =>
         val done = new CountDownLatch(1)
@@ -715,8 +717,8 @@ class LevelDBStore extends LockableServi
         db.removeSubscription(sub)
     }
   }
-  
-  
+
+
   def getTopicGCPositions = {
     import collection.JavaConversions._
     val topics = this.synchronized {
@@ -755,7 +757,7 @@ class LevelDBStore extends LockableServi
         }
       }
     }
-    
+
     def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
       var sub = db.addSubscription(key, info)
       subscriptions.synchronized {
@@ -768,7 +770,7 @@ class LevelDBStore extends LockableServi
         uow.countDownFuture
       })
     }
-    
+
     def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
       subscriptions.values.map(_.info).toArray
     }
@@ -808,7 +810,7 @@ class LevelDBStore extends LockableServi
 
       }
     }
-    
+
     def resetBatching(clientId: String, subscriptionName: String): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
         sub.cursorPosition = 0
@@ -819,13 +821,13 @@ class LevelDBStore extends LockableServi
         sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
       }
     }
-    
+
     def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
         sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
       }
     }
-    
+
     def getMessageCount(clientId: String, subscriptionName: String): Int = {
       lookup(clientId, subscriptionName) match {
         case Some(sub) =>
@@ -933,7 +935,7 @@ class LevelDBStore extends LockableServi
 
   ///////////////////////////////////////////////////////////////////////////
   // The following methods actually have nothing to do with JMS txs... It's more like
-  // operation batch.. we handle that in the DBManager tho.. 
+  // operation batch.. we handle that in the DBManager tho..
   ///////////////////////////////////////////////////////////////////////////
   def beginTransaction(context: ConnectionContext): Unit = {}
   def commitTransaction(context: ConnectionContext): Unit = {}