You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/16 15:50:35 UTC

svn commit: r1483369 - /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Author: chirino
Date: Thu May 16 13:50:35 2013
New Revision: 1483369

URL: http://svn.apache.org/r1483369
Log:
Additional fix for AMQ-4535: Seems store was getting out of sync /w cursor due to thread unsafe access in the leveldb store.

Switched to a concurrent HashMap to track the transaction since that will be getting accessed by multiple threads.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1483369&r1=1483368&r2=1483369&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May 16 13:50:35 2013
@@ -230,6 +230,7 @@ class LevelDBStore extends LockableServi
     db.loadCollections
 
     // Finish recovering the prepared XA transactions.
+    import collection.JavaConversions._
     for( (txid, transaction) <- transactions ) {
       assert( transaction.xacontainer_id != -1 )
       val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
@@ -290,7 +291,7 @@ class LevelDBStore extends LockableServi
 
   def createTransactionStore = this
 
-  val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
+  val transactions = new ConcurrentHashMap[TransactionId, Transaction]()
   
   trait TransactionAction {
     def commit(uow:DelayableUOW):Unit
@@ -395,14 +396,24 @@ class LevelDBStore extends LockableServi
     }
   }
   
-  def transaction(txid: TransactionId) = transactions.getOrElseUpdate(txid, Transaction(txid))
+  def transaction(txid: TransactionId) = {
+    var rc = transactions.get(txid)
+    if( rc == null ) {
+      rc = Transaction(txid)
+      val prev = transactions.putIfAbsent(txid, rc)
+      if (prev!=null) {
+        rc = prev
+      }
+    }
+    rc
+  }
   
   def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
     preCommit.run()
     transactions.remove(txid) match {
-      case None=>
+      case null =>
         postCommit.run()
-      case Some(tx)=>
+      case tx =>
         val done = new CountDownLatch(1)
         withUow { uow =>
           for( action <- tx.commitActions ) {
@@ -421,9 +432,9 @@ class LevelDBStore extends LockableServi
 
   def rollback(txid: TransactionId) = {
     transactions.remove(txid) match {
-      case None=>
+      case null =>
         println("The transaction does not exist")
-      case Some(tx)=>
+      case tx =>
         if( tx.prepared ) {
           val done = new CountDownLatch(1)
           withUow { uow =>
@@ -441,9 +452,9 @@ class LevelDBStore extends LockableServi
 
   def prepare(tx: TransactionId) = {
     transactions.get(tx) match {
-      case None=>
+      case null =>
         println("The transaction does not exist")
-      case Some(tx)=>
+      case tx =>
         tx.prepare
     }
   }
@@ -452,6 +463,7 @@ class LevelDBStore extends LockableServi
   def recover(listener: TransactionRecoveryListener) = {
     this.doingRecover = true
     try {
+      import collection.JavaConversions._
       for ( (txid, transaction) <- transactions ) {
         if( transaction.prepared ) {
           val (msgs, acks) = transaction.xarecovery